Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4115A200C7F for ; Wed, 24 May 2017 19:50:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3FACA160BDA; Wed, 24 May 2017 17:50:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 11929160BA5 for ; Wed, 24 May 2017 19:50:06 +0200 (CEST) Received: (qmail 91036 invoked by uid 500); 24 May 2017 17:50:06 -0000 Mailing-List: contact user-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@beam.apache.org Delivered-To: mailing list user@beam.apache.org Received: (qmail 91026 invoked by uid 99); 24 May 2017 17:50:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 May 2017 17:50:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id BA74FC0C84 for ; Wed, 24 May 2017 17:50:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.378 X-Spam-Level: ** X-Spam-Status: No, score=2.378 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=google.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id kKgH77GtCP7a for ; Wed, 24 May 2017 17:50:03 +0000 (UTC) Received: from mail-oi0-f41.google.com (mail-oi0-f41.google.com [209.85.218.41]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 30F0E5F342 for ; Wed, 24 May 2017 17:50:03 +0000 (UTC) Received: by mail-oi0-f41.google.com with SMTP id b204so251439103oii.1 for ; Wed, 24 May 2017 10:50:03 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=54ERR9QZDDcLcNE/J1Lq0esNF5hRCRQeXbPBVK94hEg=; b=vvW4hUKQcz6PFmcsjlzX76Of4NPlZ7KR2yB0WpMXJN7ka1NSEVQZS15PKwSoDnLr0u wFm8Ftjbeuwdm8Q7CqT59ytXCX4a2aaobYQN0Pkqt9BlY66ALnemHCT3qnj/jBGgrkRa rha+PLM+qhmzHhhtUmDA6H59jHrQVm9OOLRMWr0l5dUbVOGviVSpZS86A/BVSXzAmmTU 8Y5jrcHrk2tmHenM+487S14/3vAmAda+u0BBqYgO/zcurDF5t5ZDAozzZSkp2ESLHPZ9 zbOMjlgZlkMg4SEwTRv12NbmiDTh3HXCkfCuyPjaYvTDbn1Q5ZkcPxNV3VsJK9xQnavY MjIg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=54ERR9QZDDcLcNE/J1Lq0esNF5hRCRQeXbPBVK94hEg=; b=iz4Wdxkj7fG0vAzIxXBRt3tuEMLQUVchzxTZ+Mmd19FJhYj8ZVDPbWHutXsXGFXE8x C/z6xlnRmJUInpKTGe451m1QUiACgw/9Yk9YmcvKU9/vkNc5njGQHIJ24ludy+lJ/8EN QcY5k6nW3w1hoIYO7rqyuBFj55t3vNNPB1WoSf3SUwk5DJh7CNCeDSOEBdg4uWDw3b3j Doe7RvADmaYS5WTZkk1ZvbCTdM21mgC/ByMXDgW59De9wc2nfPrVSsVYaL/eT3H2Ew9n SIP8JKWgFouczpO6Cedf0tHh2wkqbXgHBVvmyrE3Ga21Vz/vqyk9TuaRvQ4VE12qFxnD 03bA== X-Gm-Message-State: AODbwcDGWGCmdbI8Y4fNfC2Kah9UFbdWkOFRzidYPNWLOJKL4Gx1t6SQ NPYYmVnawNRZ+R6YWdoAW8Y/Ye/ZuRSWct8= X-Received: by 10.202.87.70 with SMTP id l67mr19091663oib.210.1495648202280; Wed, 24 May 2017 10:50:02 -0700 (PDT) MIME-Version: 1.0 Received: by 10.74.132.8 with HTTP; Wed, 24 May 2017 10:50:01 -0700 (PDT) In-Reply-To: References: <5A7E98D2-A63B-405E-B201-87ADC54E2B3C@malloc64.com> From: Raghu Angadi Date: Wed, 24 May 2017 10:50:01 -0700 Message-ID: Subject: Re: How to decrease latency when using PubsubIO.Read? To: user@beam.apache.org Content-Type: multipart/alternative; boundary="001a113aca6acecc1f055048bb71" archived-at: Wed, 24 May 2017 17:50:08 -0000 --001a113aca6acecc1f055048bb71 Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Josh, Can you share your job_id? I could take look. Are you measuring latency end-to-end (publisher to when it appears on BT?). Are you using BigtableIO for sink? There is no easy way to use more workers when auto-scaling is enabled. It thinks your backlog and CPU are low enough and does not need to scale. Raghu. On Wed, May 24, 2017 at 10:14 AM, Josh wrote: > Thanks Ankur, that's super helpful! I will give these optimisations a go. > > About the "No operations completed" message - there are a few of these in > the logs (but very few, like 1 an hour or something) - so probably no nee= d > to scale up Bigtable. > I did however see a lot of INFO messages "Wrote 0 records" in the logs. P= robably > about 50% of the "Wrote n records" messages are zero. While the other 50% > are quite high (e.g. "Wrote 80 records"). Not sure if that could indicate= a > bad setting? > > Josh > > > > On Wed, May 24, 2017 at 5:22 PM, Ankur Chauhan wrote= : > >> There are two main things to see here: >> >> * In the logs, are there any messages like "No operations completed >> within the last 61 seconds. There are still 1 simple operations and 1 >> complex operations in progress.=E2=80=9D This means you are underscaled = on the >> bigtable side and would benefit from increasing the node count. >> * We also saw some improvement in performance (workload dependent) by >> going to a bigger worker machine type. >> * Another optimization that worked for our use case: >> >> // streaming dataflow has larger machines with smaller bundles, so we ca= n queue up a lot more without blowing up >> private static BigtableOptions createStreamingBTOptions(AnalyticsPipelin= eOptions opts) { >> return new BigtableOptions.Builder() >> .setProjectId(opts.getProject()) >> .setInstanceId(opts.getBigtableInstanceId()) >> .setUseCachedDataPool(true) >> .setDataChannelCount(32) >> .setBulkOptions(new BulkOptions.Builder() >> .setUseBulkApi(true) >> .setBulkMaxRowKeyCount(2048) >> .setBulkMaxRequestSize(8_388_608L) >> .setAsyncMutatorWorkerCount(32) >> .build()) >> .build(); >> } >> >> >> There is a lot of trial and error involved in getting the end-to-end >> latency down so I would suggest enabling the profiling using the >> =E2=80=94saveProfilesToGcs option and get a sense of what is exactly hap= pening. >> >> =E2=80=94 Ankur Chauhan >> >> On May 24, 2017, at 9:09 AM, Josh wrote: >> >> Ah ok - I am using the Dataflow runner. I didn't realise about the custo= m >> implementation being provided at runtime... >> >> Any ideas of how to tweak my job to either lower the latency consuming >> from PubSub or to lower the latency in writing to Bigtable? >> >> >> On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik wrote: >> >>> What runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, >>> ...)? >>> >>> On Wed, May 24, 2017 at 8:09 AM, Ankur Chauhan >>> wrote: >>> >>>> Sorry that was an autocorrect error. I meant to ask - what dataflow >>>> runner are you using? If you are using google cloud dataflow then the >>>> PubsubIO class is not the one doing the reading from the pubsub topic.= They >>>> provide a custom implementation at run time. >>>> >>>> Ankur Chauhan >>>> Sent from my iPhone >>>> >>>> On May 24, 2017, at 07:52, Josh wrote: >>>> >>>> Hi Ankur, >>>> >>>> What do you mean by runner address? >>>> Would you be able to link me to the comment you're referring to? >>>> >>>> I am using the PubsubIO.Read class from Beam 2.0.0 as found here: >>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/ >>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i >>>> o/gcp/pubsub/PubsubIO.java >>>> >>>> Thanks, >>>> Josh >>>> >>>> On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan >>>> wrote: >>>> >>>>> What runner address you using. Google cloud dataflow uses a closed >>>>> source version of the pubsub reader as noted in a comment on Read cla= ss. >>>>> >>>>> Ankur Chauhan >>>>> Sent from my iPhone >>>>> >>>>> On May 24, 2017, at 04:05, Josh wrote: >>>>> >>>>> Hi all, >>>>> >>>>> I'm using PubsubIO.Read to consume a Pubsub stream, and my job then >>>>> writes the data out to Bigtable. I'm currently seeing a latency of 3-= 5 >>>>> seconds between the messages being published and being written to Big= table. >>>>> >>>>> I want to try and decrease the latency to <1s if possible - does >>>>> anyone have any tips for doing this? >>>>> >>>>> I noticed that there is a PubsubGrpcClient >>>>> https://github.com/apache/beam/blob/release-2.0.0/sdks/java/ >>>>> io/google-cloud-platform/src/main/java/org/apache/beam/sdk/i >>>>> o/gcp/pubsub/PubsubGrpcClient.java however the PubsubUnboundedSource >>>>> is initialised with a PubsubJsonClient, so the Grpc client doesn't ap= pear >>>>> to be being used. Is there a way to switch to the Grpc client - as pe= rhaps >>>>> that would give better performance? >>>>> >>>>> Also, I am running my job on Dataflow using autoscaling, which has >>>>> only allocated one n1-standard-4 instance to the job, which is >>>>> running at ~50% CPU. Could forcing a higher number of nodes help impr= ove >>>>> latency? >>>>> >>>>> Thanks for any advice, >>>>> Josh >>>>> >>>>> >>>> >>> >> >> > --001a113aca6acecc1f055048bb71 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Josh,

Can you share your job_id? I coul= d take look. Are you measuring latency end-to-end (publisher to when it app= ears on BT?). Are you using BigtableIO for sink?

T= here is no easy way to use more workers when auto-scaling is enabled. It th= inks your backlog and CPU are low enough and does not need to scale.
<= div>Raghu.

On Wed, May 24, 2017 at 10:14 AM, Josh <jofo90@gmail.com> wrote:
Thanks Ankur,= that's super helpful! I will give these optimisations a go.

About the "No operations completed" message - there are = a few of these in the logs (but very few, like 1 an hour or something) - so= probably no need to scale up Bigtable.
I did however see a lot o= f INFO messages "Wrote 0 records" in the logs.=C2=A0Proba= bly about 50% of the "Wrote n records" messages are zero. While t= he other 50% are quite high (e.g. "Wrote 80 records"). Not sure i= f that could indicate a bad setting?

Josh


<= /font>

On Wed, May 24, 2017 at 5:22 PM, = Ankur Chauhan <ankur@malloc64.com> wrote:
There are two main thi= ngs to see here:

* In the logs, are there any messages l= ike=C2=A0"No operations completed within the last 61 seconds. There ar= e still 1 simple operations and 1 complex operations in progress.=E2=80=9D = This means you are underscaled on the bigtable side and would benefit from = =C2=A0increasing the node count.
* We also saw some improvement i= n performance (workload dependent) by going to a bigger worker machine type= .
* Another optimization that worked for our use case:
=
// stre=
aming dataflow has larger machines with smaller bundles, so we can queue up=
 a lot more without blowing up
priv= ate static BigtableOptions createStrea= mingBTOptions(AnalyticsPipelineOptions opts) {
return new BigtableOptions.Builder()
= .setProjectId(opts.getProject())
.setInstanceId(op= ts.getBigtableInstanceId())
.setUseCachedDataPool(true)
.setDataChannelCount(<= span style=3D"color:#6897bb">32)
.setBulkOptions(new BulkOptions.Builder()
= .setUseBulkApi(true)
= .setBulkMaxRowKeyCount(2048
)
.setBulkMaxRequestSize(8_388_608L)
.setAsyncMutatorWorkerC= ount(32)
.= build())
.build();
}=

There is a lot of trial and error involved = in getting the end-to-end latency down so I would suggest enabling the prof= iling using the =E2=80=94saveProfilesToGcs option and get a sense of what i= s exactly happening.

=E2=80=94 Ankur Chauhan
=

On May 24, 2017, at 9:09 AM, Josh <jofo90@gmail.com> wrote:
=
Ah ok - I am using the Dataflow runner. I did= n't realise about the custom implementation being provided at runtime..= .

Any ideas of how to tweak my job to either lower the l= atency consuming from PubSub or to lower the latency in writing to Bigtable= ?


On Wed, May 24, 2017 at 4:14 PM, Lukasz Cwik &= lt;lcwik@google.com> wrote:
Wha= t runner are you using (Flink, Spark, Google Cloud Dataflow, Apex, ...)?

On Wed, May 24, 2017 at 8:09 AM, Ankur= Chauhan <ankur@malloc64.com> wrote:
Sorry that was an autocorrect error. I= meant to ask - what dataflow runner are you using? If you are using google= cloud dataflow then the PubsubIO class is not the one doing the reading fr= om the pubsub topic. They provide a custom implementation at run time.

Ankur Chauhan=C2=A0
Sent from my iPhone

On May 24, 2017, at 07:52, Josh <jofo90@gmail.com> wro= te:

Hi Ankur,<= div>
What do you mean by runner address?
Would you = be able to link me to the comment you're referring to?

I am using the PubsubIO.Read class from Beam 2.0.0 as found here:<= /div>
Thanks,
Josh

On Wed, May 24, 2017 at 3:36 PM, Ankur Chauhan <an= kur@malloc64.com> wrote:
What runner address you using. Goog= le cloud dataflow uses a closed source version of the pubsub reader as note= d in a comment on Read class.=C2=A0

Ankur Chauhan
Sent from my iPhone

On May 24, = 2017, at 04:05, Josh <jofo90@gmail.com> wrote:

=
Hi all,

I'm using PubsubIO.Rea= d to consume a Pubsub stream, and my job then writes the data out to Bigtab= le. I'm currently seeing a latency of 3-5 seconds between the messages = being published and being written to Bigtable.

I w= ant to try and decrease the latency to <1s if possible - does anyone hav= e any tips for doing this?=C2=A0

I noticed that th= ere is a PubsubGrpcClient https://github= .com/apache/beam/blob/release-2.0.0/sdks/java/io/google-cloud-pla= tform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcC= lient.java however the PubsubUnboundedSource is initialised with a= PubsubJsonClient, so the Grpc client doesn't appear to be being used. = Is there a way to switch to the Grpc client - as perhaps that would give be= tter performance?

Also, I am running my job on Dat= aflow using autoscaling, which has only allocated one=C2=A0n1-stand= ard-4=C2=A0instance to the job, which is running at ~50% CPU. Could = forcing a higher number of nodes help improve latency?

=
Thanks for any advice,
Josh





--001a113aca6acecc1f055048bb71--