Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AC0DA200C1E for ; Fri, 17 Feb 2017 21:17:21 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AAA2F160B57; Fri, 17 Feb 2017 20:17:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CE5A1160B46 for ; Fri, 17 Feb 2017 21:17:20 +0100 (CET) Received: (qmail 2044 invoked by uid 500); 17 Feb 2017 20:17:19 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 2034 invoked by uid 99); 17 Feb 2017 20:17:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2017 20:17:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 71B9FC099F for ; Fri, 17 Feb 2017 20:17:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id MFM0D-PdNIO0 for ; Fri, 17 Feb 2017 20:17:17 +0000 (UTC) Received: from mail-wm0-f50.google.com (mail-wm0-f50.google.com [74.125.82.50]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 53ED35F3BC for ; Fri, 17 Feb 2017 20:17:17 +0000 (UTC) Received: by mail-wm0-f50.google.com with SMTP id c85so25474684wmi.1 for ; Fri, 17 Feb 2017 12:17:17 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=4BrP/09JPzN/ypfIbXiFFXao6nWU7d8vyIaomYHQ/nw=; b=qBHlzfVdD72XQtBPiSh8YowbY1hFS2ztWf4lHC3pTj4s0ZpvqaJPwol4UUKhw2fJb5 cCu5sSO2UmqXMxoJNroB1u2A3Ofb/U+j5dL2h2XNIGsy6qPo79Rpnc1U/D1p4FDpGb1U NIWoPkUg/+kaNurLVQ7JWLxW4GCVxQg7Hk/PVl9xl+Uwp+2Qp58YZqv5nO0J4AXxFQy+ jABh8fkdT4SdjLFvUaumWIHyjJ4vVTzW4vqS13W8UlWdG5znzPdXlCgVdvr/IIdJMMrj m2musiUrhju0WtM4EwCiAfIXQxc1IbOc1WlTD7whVgBEr1/nj/BS1hvW0W7RLe5bJekT bjEQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=4BrP/09JPzN/ypfIbXiFFXao6nWU7d8vyIaomYHQ/nw=; b=QfJYELY3osld6JD3pb/Nv8Y+glQOf0XDob1PE0YB01A6Ar0YjQ+JPgi0rpQRioifCL o0Gg7dCds/nG8fHIn3G44vuqEdmtWalZktPt/sOMtyjw3U8q502VadWbCJKhUMwRe1y4 0ogypXsvgZIJNVI+zdN/J1BTb3wyprTv+gWl3S3aUnrGmL+L1BM+798aSXCBJQuG1jGT 4D7ldgOnuYEcVbIInrCl/ptw4xRIwdrOixKpVFd2pq12zuzbXt7HuacJnERC9Ua0+Uiv bDw2sXeQ1C6+cN3QfUggTYP6zUCLYrnlSSJ1its97r/9fWZ8fmNtJ2vM3+rTL5yeCQ8X lU0g== X-Gm-Message-State: AMke39kH06eCvng8/Scu7a9Uk9yxz0CbDgcZX850e8BK4xsfGRuAuSgt8STCECo7wi9M0mAbQky8OseVUeb5mg== X-Received: by 10.28.60.66 with SMTP id j63mr3301827wma.74.1487362636347; Fri, 17 Feb 2017 12:17:16 -0800 (PST) MIME-Version: 1.0 Received: by 10.80.186.143 with HTTP; Fri, 17 Feb 2017 12:17:16 -0800 (PST) In-Reply-To: <0754067f-435d-4a1f-b690-585314656e0a@cryptolab.net> References: <0754067f-435d-4a1f-b690-585314656e0a@cryptolab.net> From: Dmitry Golubets Date: Fri, 17 Feb 2017 20:17:16 +0000 Message-ID: Subject: Re: Performance tuning To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a114a532297837c0548bf9919 archived-at: Fri, 17 Feb 2017 20:17:21 -0000 --001a114a532297837c0548bf9919 Content-Type: text/plain; charset=UTF-8 Hi Daniel, I've implemented a macro that generates message pack serializers in our codebase. Resulting code is basically a series of writes\reads like in hand-written structured serialization. E.g. given case class Data1(str: String, subdata: Data2) case class Data2(num: Int) serialization code for Data1 will be like: packer.packString(str) packer.packInt(num) The data structures in our project are quite big (2-4kb in json) and contain nested classes with many fields. So custom serialization helps us to avoid reflection and reduces data size to send over the network. However, it worth mentioning, I see that on small case classes Flink default serialization works faster. Best regards, Dmitry On Fri, Feb 17, 2017 at 6:01 PM, Daniel Santos wrote: > Hello Dimitry, > > Could you please elaborate on your tuning on -> environment.addDefaultKryoSerializer(..) > . > > I'm interested on knowing what have you done there for a boost of about > 50% . > > Some small or simple example would be very nice. > > Thank you very much in advance. > > Kind Regards, > > Daniel Santos > > On 02/17/2017 12:43 PM, Dmitry Golubets wrote: > > Hi, > > My streaming job cannot benefit much from parallelization unfortunately. > So I'm looking for things I can tune in Flink, to make it process > sequential stream faster. > > So far in our current engine based on Akka Streams (non distributed ofc) > we have 20k msg/sec. > Ported to Flink I'm getting 14k so far. > > My observations are following: > > - if I chain operations together they execute all in sequence, so I > basically sum up the time required to process one data item across all my > stream operators, not good > - if I split chains, they execute asynchronously to each other, but > there is serialization and network overhead > > Second approach gives me better results, considering that I have a server > with more than enough memory and cores to do all side work for > serialization. But I want to reduce this serialization\data transfer > overhead to a minimum. > > So what I have now: > > environment.getConfig.enableObjectReuse() // cos it's Scala we don't need > unnecessary serialization > environment.getConfig.disableAutoTypeRegistration() // it works faster > with it, I'm not sure why > environment.addDefaultKryoSerializer(..) // custom Message Pack > serialization for all message types, gives about 50% boost > > But that's it, I don't know what else to do. > I didn't find any interesting network\buffer settings in docs. > > Best regards, > Dmitry > > > --001a114a532297837c0548bf9919 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Daniel,

=
I've implemented a macro that generates message pack serializers = in our codebase.
Resulting code is basically a series of writes\re= ads like in hand-written structured serialization.

E.g. given<= br>
case class Data1(str: String, subdata: Data2)
case class = Data2(num: Int)

serialization code for Data1 will be like:
=
packer.packString(str)
packer.packInt(num)

The data s= tructures in our project are quite big (2-4kb in json) and contain nested c= lasses with many fields.
So custom serialization helps us to avoid refl= ection and reduces data size to send over the network.

However, it worth mentioning, I see that on small ca= se classes Flink default serialization works faster.


Best re= gards,
Dmitry

On Fri, Feb 17, 2017 at 6:01 PM, Daniel Sant= os <dsantos@cryptolab.net> wrote:
=20 =20 =20

Hello Dimitry,

Could you please elaborate on your tuning on -> environment.addDefaultKryoSerializer(..) .

I'm interested on knowing what have you done there for a boost o= f about 50% .

Some small or simple example would be very nice.

Thank you very much in advance.

Kind Regards,

Daniel Santos


On 02/17/2017 12:4= 3 PM, Dmitry Golubets wrote:
Hi,

My streaming job cannot benefit much from parallelization unfortunately.
So I'm looking for things I can tune in Flink, to make it process sequential stream faster.

So far in our current engine based on Akka Streams (non distributed ofc) we have 20k msg/sec.
Ported to Flink I'm getting 14k so far.

My observations are following:
  • if I chain operations together they execute all in sequence, so I basically sum up the time required to process one data item across all my stream operators, not good
  • if I split chains, they execute asynchronously to each other, but there is serialization and network overhead
Second approach gives me better results, considering that I have a server with more than enough memory and cores to do all side work for serialization. But I want to reduce this serialization\data transfer overhead to a minimum.

So what I have now:

environment.getConfig.enableObjectReuse() // cos it's Scala we don't need unnecessary serializat= ion
environment.getConfig.disableAutoTypeRegistration(= ) // it works faster with it, I'm not sure why
environment.addDefaultKryoSerializer(..) // custom Message Pack serialization for all message types, gives about 50% boost

But that's it, I don't know what else to do.=
I didn't find any interesting network\buffer settings in docs.

Best regards,<= /div>
Dmitry


--001a114a532297837c0548bf9919--