beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kaniska Mandal <kaniska.man...@gmail.com>
Subject Re: Questions on Beam Pipeline management , monitoring
Date Tue, 26 Apr 2016 06:12:36 GMT
Hi Max,

Many thanks for the great explanations.

*Few questions regarding 'Execution Strategy of Group of similar /
disparate Beam Pipelines' *

>  Is it a feasible idea to maintain any Load Balancer in front of the
'Beam-Flink Pipeline Executor Process' in order to control rate-limits /
throttling ?

> Should we maintain multiple containers of 'Beam-Flink Pipelines' or a
single container with multiple instances of Beam-Flink Pipeline ?

> Is it a good idea to use any external 'Process Flow Controller' like
Apache NiFi to wire the Beam Pipelines and launch / halt / resume them
programmatically / interactively ?

*More questions related to graceful shutdown and restart*

Currently Flink Pipeline Runner#run() throws Runtime Exception.

> So is it good enough to add a shutdownhook on Beam Pipeline and close
resources like KafkaProducer when Pipeline Job is killed due to Runtime
Exception ?  It would have been better if a custom exception was thrown ,
so that the Job could gracefully handle it !

BTW, I tried calling close() on FlinkKafkaProucer form inside Beam-Flink
Pipeline Runner , but the producer didn't stop.

> I understand we can use monitd to restart a process, but any suggestion
to implement external 'Beam Pipeline Monitoring Agent'  to auto-retry to
restart the Pipeline ?

Let me know if any of the above points sound logical, then I'll go ahead
and create Feature request.

Thanks,
Kaniska

On Mon, Apr 25, 2016 at 11:12 AM, Maximilian Michels <mxm@apache.org> wrote:

> Hi Kaniska,
>
> Not all of these are uniform across all Runners yet but since you have
> previously deployed applications with the Flink Runner, here are my
> answers from a Flink perspective.
>
> ** Shutdown **
>
> For shutting down a Flink pipeline, you can use the "cancel" function:
>
> >./bin/flink cancel <jobId>
>
> When you submit your job in detached mode, e.g., ./bin/flink run -d
> /path/to/jar you get a job id in return which you can use for the
> cancel command. Alternatively, query the running jobs via
>
> >./bin/flink list
>
> Very soon we will have checkpointing of sources/sinks in Beam. That
> would enable you to use Flink's Savepoint feature. Savepoints allow
> you to take a snapshot of your Flink job at a moment and time,
> shutdown your application, and resume execution at that snapshot later
> in time. This works for Flink but not yet for Beam programs.
>
> ** Scheduling **
>
> You'll have to setup a cron job or an external scheduling program to
> run a Flink job at a specified time. There is no built-in pipeline
> scheduling in Flink.
>
> ** Monitoring **
>
> Flink has a nice web interface available on port 8081 of the job
> manager (master) node. It contains statistics like the number of
> records read/written per operator and JVM metrics.
>
> You may also register "Accumulators" which enable you to provide your
> own metrics. In the Beam API these are called "Aggregators".
> Aggregators get translated to Flink accumulators. For instance, you
> can have an aggregator that counts the number of records written by a
> particular operator.
>
> You can see these metrics on the web interface or access them via the
> Flink Rest API:
>
> https://ci.apache.org/projects/flink/flink-docs-master/internals/monitoring_rest_api.html
>
> Here is an example of an aggregator in Beam which counts the number of
> elements processed:
>
> public class TestAggregator {
>
>   public static void main(String[] args) throws
> AggregatorRetrievalException {
>
>     class MyDoFn extends DoFn<Integer, Integer> {
>
>       Aggregator<Long, Long> agg = createAggregator("numRecords", new
> Sum.SumLongFn());
>
>       @Override
>       public void processElement(ProcessContext c) throws Exception {
>         agg.addValue(1L);
>       }
>     }
>
>     FlinkPipelineOptions options =
> PipelineOptionsFactory.as(FlinkPipelineOptions.class);
>     options.setRunner(FlinkPipelineRunner.class);
>     options.setStreaming(true);
>
>     Pipeline pipeline = Pipeline.create(options);
>
>     MyDoFn myDoFn = new MyDoFn();
>     pipeline.apply(Create.of(1, 2, 3)).apply(ParDo.of(myDoFn));
>
>     PipelineResult result = pipeline.run();
>
>     System.out.println("Result: " +
> result.getAggregatorValues(myDoFn.agg).getValues());
>
>   }
> }
>
> As expected, this prints [3].
>
>
> Cheers,
> Max
>
> On Mon, Apr 25, 2016 at 7:25 PM, kaniska Mandal
> <kaniska.mandal@gmail.com> wrote:
> > Whats the recommended approach
> >
> >> to reliably shut down the pipeline
> >
> >> to run the beam-flink pipeline in a scheduled manner
> >
> >> tomonitor the rates/throughputs/throttling/multiple threads spawned  -
> by
> >> Pipeline , any suggestion ?
> >
> > Thanks
> > Kaniska
>

Mime
View raw message