Return-Path: X-Original-To: apmail-beam-user-archive@minotaur.apache.org Delivered-To: apmail-beam-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3E8691960F for ; Tue, 26 Apr 2016 06:12:41 +0000 (UTC) Received: (qmail 80742 invoked by uid 500); 26 Apr 2016 06:12:41 -0000 Delivered-To: apmail-beam-user-archive@beam.apache.org Received: (qmail 80660 invoked by uid 500); 26 Apr 2016 06:12:41 -0000 Mailing-List: contact user-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@beam.incubator.apache.org Delivered-To: mailing list user@beam.incubator.apache.org Received: (qmail 80650 invoked by uid 99); 26 Apr 2016 06:12:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Apr 2016 06:12:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id A927B18044E for ; Tue, 26 Apr 2016 06:12:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id ciUCe1QMPzcZ for ; Tue, 26 Apr 2016 06:12:38 +0000 (UTC) Received: from mail-lf0-f41.google.com (mail-lf0-f41.google.com [209.85.215.41]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id B39AE5F3A0 for ; Tue, 26 Apr 2016 06:12:37 +0000 (UTC) Received: by mail-lf0-f41.google.com with SMTP id y84so4742523lfc.0 for ; Mon, 25 Apr 2016 23:12:37 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to; bh=PnwWXNzTWYM4Ka04jDZbxuv+WEYt1/sbYiGgq1QuNsg=; b=TlfcgHG8oA/dSwsXBoZtd5KNwGTJqxbPKcpE07Tjlvr5j1VYxTSkI4wSQ/3I0P3pyu hG2Pw7Q8g1JhHjdwPTEGB2GUOkXv+0Vfn7p9qsPtj+sHpmNk+d8Lon3R7gW7Zc5HFpPT o/RT8Sh5TLv2eDoccCK7Htj4KxcZdwFO6hYwO3uqvahxAmNrTz63/AtS4oRKQLqO9aMh plWu1lG1F1l66XQKBTksnQa/mneG0hDm2C/xwjj9aUeieJ1qTYBm4c3SiyCXaBbVx8yk 9Nbqgb8bBs9+EjySXvAgB3C6elMunxS34Weii8fBxuCSzhUp1aZUvrWEP63X1TjAiMx2 EK6g== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:date :message-id:subject:from:to; bh=PnwWXNzTWYM4Ka04jDZbxuv+WEYt1/sbYiGgq1QuNsg=; b=If9U/mU6p75TIlYwN+jKf2KolRtCXDf68NIpsLECVqC8JaFnA2oKHwVpqKqW2OR3rg qS1i14e9RqPKsSDq6y/BS8jOYIzMN38CmyQp44kmFfKbvMFKn2gY0DpScWZiCGBZZKHa DWDeQYjnkIv98w/yUZUiUZFop8KX6epUlClLbvPVpbQcvcFPQi8ha7MgoDBBOmKpdLul WuGTlfp7Ski/jrzK3QHFz2Tyoo1N/htnzutKBxtDwzoNkR+MmrBzZNOQ81ni2SHLl4+Q a0UkLl5MjjsnCgbsN4gXaIRrc69lAvpjZddMz8ZHv83w9b7x94+BiROFSDl5FNegUGKT PWVQ== X-Gm-Message-State: AOPr4FUrwIo1sMdg/y7dZMbDl5KBV5ptwZxBtOiHOkf2qaseiaiYo8tBujOAZGCQuEI9Mj26+98/MSc75pXnWQ== MIME-Version: 1.0 X-Received: by 10.25.22.19 with SMTP id m19mr446935lfi.118.1461651156089; Mon, 25 Apr 2016 23:12:36 -0700 (PDT) Received: by 10.25.126.212 with HTTP; Mon, 25 Apr 2016 23:12:36 -0700 (PDT) In-Reply-To: References: Date: Mon, 25 Apr 2016 23:12:36 -0700 Message-ID: Subject: Re: Questions on Beam Pipeline management , monitoring From: kaniska Mandal To: user@beam.incubator.apache.org Content-Type: multipart/alternative; boundary=001a11406a9cf1c30d05315d2d90 --001a11406a9cf1c30d05315d2d90 Content-Type: text/plain; charset=UTF-8 Hi Max, Many thanks for the great explanations. *Few questions regarding 'Execution Strategy of Group of similar / disparate Beam Pipelines' * > Is it a feasible idea to maintain any Load Balancer in front of the 'Beam-Flink Pipeline Executor Process' in order to control rate-limits / throttling ? > Should we maintain multiple containers of 'Beam-Flink Pipelines' or a single container with multiple instances of Beam-Flink Pipeline ? > Is it a good idea to use any external 'Process Flow Controller' like Apache NiFi to wire the Beam Pipelines and launch / halt / resume them programmatically / interactively ? *More questions related to graceful shutdown and restart* Currently Flink Pipeline Runner#run() throws Runtime Exception. > So is it good enough to add a shutdownhook on Beam Pipeline and close resources like KafkaProducer when Pipeline Job is killed due to Runtime Exception ? It would have been better if a custom exception was thrown , so that the Job could gracefully handle it ! BTW, I tried calling close() on FlinkKafkaProucer form inside Beam-Flink Pipeline Runner , but the producer didn't stop. > I understand we can use monitd to restart a process, but any suggestion to implement external 'Beam Pipeline Monitoring Agent' to auto-retry to restart the Pipeline ? Let me know if any of the above points sound logical, then I'll go ahead and create Feature request. Thanks, Kaniska On Mon, Apr 25, 2016 at 11:12 AM, Maximilian Michels wrote: > Hi Kaniska, > > Not all of these are uniform across all Runners yet but since you have > previously deployed applications with the Flink Runner, here are my > answers from a Flink perspective. > > ** Shutdown ** > > For shutting down a Flink pipeline, you can use the "cancel" function: > > >./bin/flink cancel > > When you submit your job in detached mode, e.g., ./bin/flink run -d > /path/to/jar you get a job id in return which you can use for the > cancel command. Alternatively, query the running jobs via > > >./bin/flink list > > Very soon we will have checkpointing of sources/sinks in Beam. That > would enable you to use Flink's Savepoint feature. Savepoints allow > you to take a snapshot of your Flink job at a moment and time, > shutdown your application, and resume execution at that snapshot later > in time. This works for Flink but not yet for Beam programs. > > ** Scheduling ** > > You'll have to setup a cron job or an external scheduling program to > run a Flink job at a specified time. There is no built-in pipeline > scheduling in Flink. > > ** Monitoring ** > > Flink has a nice web interface available on port 8081 of the job > manager (master) node. It contains statistics like the number of > records read/written per operator and JVM metrics. > > You may also register "Accumulators" which enable you to provide your > own metrics. In the Beam API these are called "Aggregators". > Aggregators get translated to Flink accumulators. For instance, you > can have an aggregator that counts the number of records written by a > particular operator. > > You can see these metrics on the web interface or access them via the > Flink Rest API: > > https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html > > Here is an example of an aggregator in Beam which counts the number of > elements processed: > > public class TestAggregator { > > public static void main(String[] args) throws > AggregatorRetrievalException { > > class MyDoFn extends DoFn { > > Aggregator agg = createAggregator("numRecords", new > Sum.SumLongFn()); > > @Override > public void processElement(ProcessContext c) throws Exception { > agg.addValue(1L); > } > } > > FlinkPipelineOptions options = > PipelineOptionsFactory.as(FlinkPipelineOptions.class); > options.setRunner(FlinkPipelineRunner.class); > options.setStreaming(true); > > Pipeline pipeline = Pipeline.create(options); > > MyDoFn myDoFn = new MyDoFn(); > pipeline.apply(Create.of(1, 2, 3)).apply(ParDo.of(myDoFn)); > > PipelineResult result = pipeline.run(); > > System.out.println("Result: " + > result.getAggregatorValues(myDoFn.agg).getValues()); > > } > } > > As expected, this prints [3]. > > > Cheers, > Max > > On Mon, Apr 25, 2016 at 7:25 PM, kaniska Mandal > wrote: > > Whats the recommended approach > > > >> to reliably shut down the pipeline > > > >> to run the beam-flink pipeline in a scheduled manner > > > >> tomonitor the rates/throughputs/throttling/multiple threads spawned - > by > >> Pipeline , any suggestion ? > > > > Thanks > > Kaniska > --001a11406a9cf1c30d05315d2d90 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hi Max,

Many thanks for the gr= eat explanations.

Few questions regarding 'Executi= on Strategy of Group of similar / disparate Beam Pipelines'
>=C2=A0 Is it a feasible idea to maintain any Load Balancer in front of= the 'Beam-Flink Pipeline Executor Process' in order to control rat= e-limits / throttling ?

> Should we maintain multiple = containers of 'Beam-Flink Pipelines' or a single container with mul= tiple instances of Beam-Flink Pipeline ?

> Is it a goo= d idea to use any external 'Process Flow Controller' like Apache Ni= Fi to wire the Beam Pipelines and launch / halt / resume them programmatica= lly / interactively ?

More questions related to gracef= ul shutdown and restart

Currently Flink Pipelin= e Runner#run() throws Runtime Exception.

> So is it g= ood enough to add a shutdownhook on Beam Pipeline and close resources like = KafkaProducer when Pipeline Job is killed due to Runtime Exception ?=C2=A0 = It would have been better if a custom exception was thrown , so that the Jo= b could gracefully handle it !

BTW, I tried calling close= () on FlinkKafkaProucer form inside Beam-Flink Pipeline Runner , but the pr= oducer didn't stop.

> I understand we can use moni= td to restart a process, but any suggestion to implement external 'Beam= Pipeline Monitoring Agent'=C2=A0 to auto-retry to restart the Pipeline= ?

Let me know if any of the above points sound logical, then= I'll go ahead and create Feature request.

Thank= s,
Kaniska

On Mon, Apr 25, 2016 at 11:12 AM, Maximilian Michels= <= mxm@apache.org> wrote:
Hi K= aniska,

Not all of these are uniform across all Runners yet but since you have
previously deployed applications with the Flink Runner, here are my
answers from a Flink perspective.

** Shutdown **

For shutting down a Flink pipeline, you can use the "cancel" func= tion:

>./bin/flink cancel <jobId>

When you submit your job in detached mode, e.g., ./bin/flink run -d
/path/to/jar you get a job id in return which you can use for the
cancel command. Alternatively, query the running jobs via

>./bin/flink list

Very soon we will have checkpointing of sources/sinks in Beam. That
would enable you to use Flink's Savepoint feature. Savepoints allow
you to take a snapshot of your Flink job at a moment and time,
shutdown your application, and resume execution at that snapshot later
in time. This works for Flink but not yet for Beam programs.

** Scheduling **

You'll have to setup a cron job or an external scheduling program to run a Flink job at a specified time. There is no built-in pipeline
scheduling in Flink.

** Monitoring **

Flink has a nice web interface available on port 8081 of the job
manager (master) node. It contains statistics like the number of
records read/written per operator and JVM metrics.

You may also register "Accumulators" which enable you to provide = your
own metrics. In the Beam API these are called "Aggregators".
Aggregators get translated to Flink accumulators. For instance, you
can have an aggregator that counts the number of records written by a
particular operator.

You can see these metrics on the web interface or access them via the
Flink Rest API:
https://ci.= apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.h= tml

Here is an example of an aggregator in Beam which counts the number of
elements processed:

public class TestAggregator {

=C2=A0 public static void main(String[] args) throws AggregatorRetrievalExc= eption {

=C2=A0 =C2=A0 class MyDoFn extends DoFn<Integer, Integer> {

=C2=A0 =C2=A0 =C2=A0 Aggregator<Long, Long> agg =3D createAggregator(= "numRecords", new
Sum.SumLongFn());

=C2=A0 =C2=A0 =C2=A0 @Override
=C2=A0 =C2=A0 =C2=A0 public void processElement(ProcessContext c) throws Ex= ception {
=C2=A0 =C2=A0 =C2=A0 =C2=A0 agg.addValue(1L);
=C2=A0 =C2=A0 =C2=A0 }
=C2=A0 =C2=A0 }

=C2=A0 =C2=A0 FlinkPipelineOptions options =3D
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
=C2=A0 =C2=A0 options.setRunner(FlinkPipelineRunner.class);
=C2=A0 =C2=A0 options.setStreaming(true);

=C2=A0 =C2=A0 Pipeline pipeline =3D Pipeline.create(options);

=C2=A0 =C2=A0 MyDoFn myDoFn =3D new MyDoFn();
=C2=A0 =C2=A0 pipeline.apply(Create.of(1, 2, 3)).apply(ParDo.of(myDoFn));
=C2=A0 =C2=A0 PipelineResult result =3D pipeline.run();

=C2=A0 =C2=A0 System.out.println("Result: " +
result.getAggregatorValues(myDoFn.agg).getValues());

=C2=A0 }
}

As expected, this prints [3].


Cheers,
Max

On Mon, Apr 25, 2016 at 7:25 PM, kaniska Mandal
<kaniska.mandal@gmail.com> wrote:
> Whats the recommended approach
>
>> to reliably shut down the pipeline
>
>> to run the beam-flink pipeline in a scheduled manner
>
>> tomonitor the rates/throughputs/throttling/multiple threads spawne= d=C2=A0 - by
>> Pipeline , any suggestion ?
>
> Thanks
> Kaniska

--001a11406a9cf1c30d05315d2d90--