Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E0961200B6D for ; Tue, 9 Aug 2016 00:51:12 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id DF47B160AB4; Mon, 8 Aug 2016 22:51:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1F1CB160A91 for ; Tue, 9 Aug 2016 00:51:10 +0200 (CEST) Received: (qmail 98076 invoked by uid 500); 8 Aug 2016 22:51:10 -0000 Mailing-List: contact user-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@beam.incubator.apache.org Delivered-To: mailing list user@beam.incubator.apache.org Received: (qmail 98066 invoked by uid 99); 8 Aug 2016 22:51:10 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Aug 2016 22:51:10 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id BFC121A5CB0 for ; Mon, 8 Aug 2016 22:51:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.953 X-Spam-Level: X-Spam-Status: No, score=0.953 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, KAM_LINEPADDING=1.2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=google.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id F53tFz-RvGS6 for ; Mon, 8 Aug 2016 22:51:06 +0000 (UTC) Received: from mail-yb0-f180.google.com (mail-yb0-f180.google.com [209.85.213.180]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id ACAE75F29B for ; Mon, 8 Aug 2016 22:51:05 +0000 (UTC) Received: by mail-yb0-f180.google.com with SMTP id x196so42727462ybe.1 for ; Mon, 08 Aug 2016 15:51:05 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=vM5VADl9W/ZZ9XyO/PnzGn6iJ5p4eCuv+yy2WkH0OJI=; b=RMZ6tSY4MxEfDwn28gzAaEMmYSDef+UjnLv7ifvf0oGpj1H3TrBthtahn5BOHaubiW WLqDw2ontXB+aLyrJaQvHCxcN46TU1cQO/U1GanCptmCo0Li4KFiDFWgRJeUs7gDRg6F hJdksW5RrtcF3bVeuGzAw/STfraOWdSN+xoxaXsAUBS8UhKASuS13LHM92/ifkN1ik8X qoLeLC3i25G/sM+9YaDVSkM6xQkY8cFFAoc3vKWNSAbzK1eVwDBgFBemd9M5XYnT34mu JerpQGkFE9g2/6Jz/AKirxy7ikIF6D3yYFY6stEW5sEuglJkqy+jY3UwzHfKeIzN9yrm TtLA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=vM5VADl9W/ZZ9XyO/PnzGn6iJ5p4eCuv+yy2WkH0OJI=; b=QUJubEc1p2cye4dhvSGmyWQUjX6wAz2SWwAl/+qNntGWquh+iZNvb8f8nc4ZeXPMsD bmOoNTAKgms9wfFGiVLjxJAVDOwzN18lG5PTBwRZcwEWSDUsHLVtP8gBVqxA2cfQ+Pr8 LmYCZgTveME8YL4mfFTq6qQQDXU/m9pzYPE0/rmvumXcJyJDiWVTqT0eVBkWqyzw9UPA jS8j44/UmLiJeFsNbML9nUJ2HRIPKIrdlrXKQIkAGxhrCzyhl4VOcZOVYM97l1ZpnQVE B9FF68ynDJxMSGGfwMwRp2PrS4LVP9MU9XV6Hd/qYYa4a2JuwOAOPZl/uIxgXkMWA3tG CilQ== X-Gm-Message-State: AEkoouur1XdGOil4sZx6OlGjGCGh8B/yl0kQVvapdxbbZexuYI9LrU/3SveE6H3g/no37t+SYhGxtW881LM3HPxl X-Received: by 10.37.207.82 with SMTP id f79mr4291919ybg.13.1470696659414; Mon, 08 Aug 2016 15:50:59 -0700 (PDT) MIME-Version: 1.0 Received: by 10.129.108.79 with HTTP; Mon, 8 Aug 2016 15:50:57 -0700 (PDT) In-Reply-To: <1719166910.14329956.1470692246656.JavaMail.yahoo@mail.yahoo.com> References: <1114573436.13709006.1470638686505.JavaMail.yahoo.ref@mail.yahoo.com> <1114573436.13709006.1470638686505.JavaMail.yahoo@mail.yahoo.com> <889660141.14153262.1470685573929.JavaMail.yahoo@mail.yahoo.com> <1719166910.14329956.1470692246656.JavaMail.yahoo@mail.yahoo.com> From: Thomas Groh Date: Mon, 8 Aug 2016 15:50:57 -0700 Message-ID: Subject: Re: Is Beam pipeline runtime behavior inconsistent? To: user@beam.incubator.apache.org, amir bahmanyari Content-Type: multipart/alternative; boundary=94eb2c0565c8f53e980539973f66 archived-at: Mon, 08 Aug 2016 22:51:13 -0000 --94eb2c0565c8f53e980539973f66 Content-Type: text/plain; charset=UTF-8 You would performance no better than single-threaded behavior if you group everything into a single key, hence why this approach is strongly not recommended. You can still get continuous output, depending on the triggering, but you lose all of scaling benefits of running a pipeline as opposed to a simple Java program, plus may incur some additional overhead. To enforce this sort of threading you would do something among the lines of: kafkarecords.apply(WithKeys.of(1)) .apply(GroupByKey.create()) .apply(Values.>create()) .apply(new DoFn, String>() {...}); Where the DoFn unrolls its input and on each element applies the processing. On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari wrote: > Thanks so much Thomas. > Fantastic answer & great learning about whats really going on underneath > the hood. > Have a question on your suggestion: "To do so, you would key the inputs > to a single static key and apply a GroupByKey, running the processing > method on the output Iterable produced by the GroupByKey"... > Wouldn't doing such defeats the "real-time Streaming" objectives? > To me the above leads to a simulation of a simple single threaded java > process but its executing in a massively parallel infrastructure in > a"fancy" way :-) > Is there an example that demonstrates how to actually implement your > suggestion above without any hidden loopholes pls? > I can at least try it and see how far it gets for R&D purposes & share the > results with the community. > Cheers+have a wonderful day. > > ------------------------------ > *From:* Thomas Groh > *To:* user@beam.incubator.apache.org; amir bahmanyari > > *Sent:* Monday, August 8, 2016 1:44 PM > *Subject:* Re: Is Beam pipeline runtime behavior inconsistent? > > There's no way to guarantee that exactly one record is processed at a > time. This is part of the design of ParDo to work efficiently across > multiple processes and machines[1], where multiple instances of a DoFn must > exist in order for progress to be made in a timely fashion. This includes > processing the same element across multiple machines at the same time, with > only one of the results being available in the output PCollection, as well > as retries of failed elements. > > A runner is required to interact with a DoFn instance in a single-threaded > manner - however, it is permitted to have multiple different DoFn instances > active within a single process and across processes at any given time (for > the same reasons as above). There's no support in the Beam model to > restrict this type of execution. We do not encourage sharing objects > between DoFn instances, and any shared state must be accessed in a > thread-safe manner, and modifications to shared state should be idempotent, > as otherwise retries and speculative execution may cause that state to be > inconsistent. A DoFn will be reused for multiple elements across a single > bundle, and may be reused across multiple bundles - if you require the DoFn > to be "fresh" per element, it should perform any required setup at the > start of the ProcessElement method. > > The best that can be done if it is absolutely required to restrict > processing to a single element at a time would be to group all of the > elements to a single key. Note that this will not solve the problem in all > cases, as a runner is permitted to execute the group of elements multiple > times so long as it only takes one completed bundle as the result, and > additionally this removes the ability of the runner to balance work and > introduces a performance bottleneck. To do so, you would key the inputs to > a single static key and apply a GroupByKey, running the processing method > on the output Iterable produced by the GroupByKey (directly; expanding the > input iterable in a separate PCollection allows a runner to rebalance the > elements, which will reintroduce parallelism)`. > > [1] https://github.com/apache/ incubator-beam/blob/master/ > sdks/java/core/src/main/java/ org/apache/beam/sdk/ > transforms/ParDo.java#L360 > > > On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari > wrote: > > Hi Thomas, > Thanks so much for your response. Here are answers to your questions. > You have a specific collection of records stored in Kafka. You run your > pipeline, and observe duplicate elements. Is that accurate? > > ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive > the records. I have confirmed that I dont get duplicates from Kafka. > However, > for some reason, certain parts of my code execute beyond the actual number > of expected number of records, and subsequently produce extra resulting > data. > I tried playing with the Triggering. Stretching the window interval, > DiscardingFiredPanes etc. all kinds of modes. > Same. How can I guarantee that one record at a time executes in one > unique instance of the inner class object? > I have all the shared objects synchronized and am using Java concurrent > hashmaps. How can I guarantee synchronized operations amongst "parallel > pipelines"? Analogous to multiple threads accessing a shared object and > trying to modify it... > > Here is my current KafkaIO() call: > PCollection kafkarecords = p.apply(KafkaIO.read(). > withBootstrapServers(" kafkahost:9092").withTopics( topics). > withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( > Values.create()). apply(Window.into( > FixedWindows.of(Duration. standardMinutes(1))) > .triggering(AfterWatermark. pastEndOfWindow()). > withAllowedLateness(Duration. ZERO) > .discardingFiredPanes()); > > kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new > DoFn() {.//I expect one record at a time to one object here > ------------------------------ ------------------------------ > ------------------------------ ------------------------------ > ----------------------- > > Have you confirmed that you're getting duplicate records via other library > transforms (such as applying Count.globally() to k afkarecords)? > ==>>No duplicates from Kafka. > ------------------------------ ------------------------------ > ------------------------------ ------------------------------ > ----------------------- > Additionally, I'm not sure what you mean by "executes till a record lands > on method" > ==>>Sorry for my confusing statement. Like I mentioned above, I expect > each record coming from Kafka gets assigned to one instance of the inner > class and therefore one instance of the pipeline executed it in parallel > with others executing their own unique records. > > ------------------------------ ------------------------------ > ------------------------------ ------------------------------ > ----------------------- > > Additionally additionally, is this reproducible if you execute with the > DirectRunner? > ==>>I have not tried DirectRunner. Should I? > > Thanks so much Thomas. > > > ------------------------------ > *From:* Thomas Groh > *To:* user@beam.incubator.apache.org ; amir bahmanyari < > amirtousa@yahoo.com> > *Sent:* Monday, August 8, 2016 11:43 AM > *Subject:* Re: Is Beam pipeline runtime behavior inconsistent? > > Just to make sure I understand the problem: > > You have a specific collection of records stored in Kafka. You run your > pipeline, and observe duplicate elements. Is that accurate? > > Have you confirmed that you're getting duplicate records via other library > transforms (such as applying Count.globally() to kafkarecords)? > > Additionally, I'm not sure what you mean by "executes till a record lands > on method" > > Additionally additionally, is this reproducible if you execute with the > DirectRunner? > > > On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari > wrote: > > Hi Colleagues, > I refrained from posting this email before completing thorough testing. > I think I did. > My core code works perfect & produces the expect result every single time > without wrapping it with Beam KafkaIO to receive the data. > Without KafkaIO, it receives the records from a flat data file. I repeated > it and it always produced the right result. > With including a Beam KarkaIO and embedding exact same code in a anonymous > class running Beam pipelines, I get a different result every time I rerun > it. > Below is the snippet from where KafkaIO executes till a record lands on > method. > Kafka sends precise number of records. No duplicates. all good. > While executing in Beam, when the records are finished & I expect a > correct result, it always produces something different. > Different in different runs. > I appreciate shedding light on this issue. And thanks for your valuable > time as always. > Amir- > > public static synchronized void main(String[] args) throws Exception { > > // Create Beam Options for the Flink Runner. > FlinkPipelineOptions options = PipelineOptionsFactory.as( > FlinkPipelineOptions.class); > // Set the Streaming engine as FlinkRunner > options.setRunner( FlinkPipelineRunner.class); > // This is a Streaming process (as opposed to Batch=false) > options.setStreaming(true); > //Create the DAG pipeline for parallel processing of independent LR records > Pipeline p = Pipeline.create(options); > //Kafka broker topic is identified as "lroad" > List topics = Arrays.asList("lroad"); > > PCollection kafkarecords = p.apply(KafkaIO.read(). > withBootstrapServers(" kafkahost:9092").withTopics( topics). > withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( > Values.create()). apply(Window.into( > FixedWindows.of(Duration. standardMinutes(1))) > .triggering(AfterWatermark. pastEndOfWindow()). > withAllowedLateness(Duration. ZERO) > .accumulatingFiredPanes()); > > kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new > DoFn() { > > public void processElement(ProcessContext ctx) > throws Exception { > > *My core logic code here.* > })); > . > . > p.run(); // Start Beam Pipeline(s) in FlinkC Cluster > } // of main > }// of class > > > > > > > > --94eb2c0565c8f53e980539973f66 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
You would performance no better than single-threaded behav= ior if you group everything into a single key, hence why this approach is s= trongly not recommended. You can still get continuous output, depending on = the triggering, but you lose all of scaling benefits of running a pipeline = as opposed to a simple Java program, plus may incur some additional overhea= d.

To enforce this sort of threading= you would do something among the lines of:

kafkar= ecords.apply(WithKeys.<Integer, String>of(1))
=C2=A0 =C2=A0= .apply(GroupByKey.<Integer, String>create())
=C2=A0 =C2=A0= .apply(Values.<Iterable<String>>create())
=C2=A0 =C2= =A0 .apply(new DoFn<Iterable<String>, String>() {...});

Where the DoFn unrolls its input and on each element appl= ies the processing.

=

On Mon, Aug= 8, 2016 at 2:37 PM, amir bahmanyari <amirtousa@yahoo.com>= wrote:
Thanks so much Thomas.=C2=A0
Fa= ntastic answer & great learning about whats really going on underneath = the hood.
Have a question on your suggestion: "To do so, you would key the i= nputs to a single static key and apply a GroupByKey, running the processing= method on the output Iterable produced by the GroupByKey"...
Wouldn't doing such defeats the "real-time Streaming" = objectives?
To me the above leads to a simulation of a simple singl= e threaded java process but its executing in a massively parallel infrastru= cture in =C2=A0a"fancy" way :-)
Is there an example that = demonstrates how to actually implement your suggestion above without any hi= dden loopholes pls?=C2=A0
I can at least try it and see how far it = gets for R&D purposes & share the results with the community.
=
Cheers+have a wonderful day.

From: Thomas Groh &= lt;tgroh@google.com>
To:
user@beam.incubator.= apache.org; amir bahmanyari <amirtousa@yahoo.com>
Sent: Monday, August 8, 2016 1:44 PM
Subject: R= e: Is Beam pipeline runtime behavior inconsistent?

There's no way to guarantee that exactly one= record is processed at a time. This is part of the design of ParDo to work efficiently across multiple pro= cesses and machines[1], where multiple instances of a DoFn must exist in or= der for progress to be made in a timely fashion. This includes processing t= he same element across multiple machines at the same time, with only one of= the results being available in the output PCollection, as well as retries = of failed elements.

A runner is required to interact with a DoFn inst= ance in a single-threaded manner - however, it is permitted to have multipl= e different DoFn instances active within a single process and across proces= ses at any given time (for the same reasons as above). There's no suppo= rt in the Beam model to restrict this type of execution. We do not encourag= e sharing objects between DoFn instances, and any shared state must be acce= ssed in a thread-safe manner, and modifications to shared state should be i= dempotent, as otherwise retries and speculative execution may cause that st= ate to be inconsistent.=C2=A0A DoFn will be reused for multiple elements ac= ross a single bundle, and may be reused across multiple bundles - if you re= quire the DoFn to be "fresh" per element, it should perform any r= equired setup at the start of the Proce= ssElement method.

The best that can be done if it is abs= olutely required to restrict processing to a single element at a time would= be to group all of the elements to a single key. Note that this will not s= olve the problem in all cases, as a runner is permitted to execute the grou= p of elements multiple times so long as it only takes one completed bundle = as the result, and additionally this removes the ability of the runner to b= alance work and introduces a performance bottleneck. To do so, you would ke= y the inputs to a single static key and apply a GroupByKey, running the pro= cessing method on the output Iterable produced by the GroupByKey (directly;= expanding the input iterable in a separate PCollection allows a runner to = rebalance the elements, which will reintroduce parallelism)`.

On Mon, Aug 8, 2016 at 12:46 PM, = amir bahmanyari <amirtousa@yahoo.com= > wrote:
Hi Thomas,
Thanks so much for your response. Here are answers to your questions= .
You have a specific collect= ion of records stored in Kafka. You run your pipeline, and observe duplicat= e elements. Is that accurate?

=3D=3D>> I send records to Kafka from my lapt= op. I use KafkaIO() to receive the records. I have confirmed that I dont ge= t duplicates from Kafka. However,
for some reason, certain parts of my code execut= e beyond the actual number of expected number of records, and subsequently = produce extra resulting data.=C2=A0
I tried playing with the Triggering. Stretchin= g the window interval, DiscardingFiredPanes etc. all kinds of modes.
<= div dir=3D"ltr" id=3D"m_-105207906128589263yiv8746533902m_-7135387701050798= 741yui_3_16_0_1_1470678013649_19351" style=3D"font-size:16px">Same.=C2=A0 H= ow can I guarantee that one record at a time executes in one unique instanc= e of the inner class object?
I have all the shared objects synchronized and am usi= ng Java concurrent hashmaps. How can I guarantee synchronized operations am= ongst "parallel pipelines"? Analogous to multiple threads accessi= ng a shared object and trying to modify it...

Here is my current Kafk= aIO() call:
PCo= llection<String> kafkarecords =3D p.apply(KafkaIO.read(). withBootstr= apServers(" kafkahost:9092").withTopics( topics).
= withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply( Val= ues.<String>create()). apply(Window.<String>into( FixedWindows.= of(Duration. standardMinutes(1)))
=C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0.triggering(AfterWatermark. pastEndOfWindow()). withAllowedLa= teness(Duration. ZERO)
=C2=A0 =C2=A0.discardi= ngFiredPanes());
=C2=A0 =C2=A0 =C2=A0 =C2=A0
= =C2=A0 =C2=A0 kafkarecords.apply(ParDo. named("ProcessLRKafkaData")= . of(new DoFn<String, String>() {.//I expect one record at a time to = one object here
------------------------------= ------------------------------ ------------------------------ ------------= ------------------ -----------------------

Have you confirmed that you're getting duplicate r= ecords via other library transforms (such as applying=C2=A0Count.globally()=C2=A0to=C2= =A0k afkarecords)?
=3D=3D>&= gt;No duplicates from Kafka.
------------------------------ ------------------------------ ---= --------------------------- ------------------------------ ----------------= -------
A= dditionally, I'm not sure what you mean by "executes till a record lands on method= "
=3D=3D>>Sorry for my confusing statement. Like = I mentioned above, I expect each record coming from Kafka gets assigned to = one instance of the inner class and therefore one instance of the pipeline = executed it in parallel with others executing their own unique records.

------------------------------= ------------------------------ ------------------------------ ------------= ------------------ -----------------------

<= div dir=3D"ltr" id=3D"m_-105207906128589263yiv8746533902m_-7135387701050798= 741yui_3_16_0_1_1470678013649_19366">Additionally additionally, is this rep= roducible if you execute with the=C2=A0DirectRunner?=C2=A0
=3D=3D>>I have not tried DirectRunner. Should I?=C2= =A0

Thanks so much Thomas.
=


<= div id=3D"m_-105207906128589263yiv8746533902m_-7135387701050798741yui_3_16_= 0_1_1470678013649_19319" style=3D"font-family:HelveticaNeue,Helvetica Neue,= Helvetica,Arial,Lucida Grande,sans-serif;font-size:12px">

From: Thomas Groh <tgroh@google.com>
To: user@beam.incubator.apach= e.org ; amir bahmanyari <amirtousa@yahoo.com> <= br clear=3D"none"> Sent: Mon= day, August 8, 2016 11:43 AM
Subject: Re: Is Beam pipeline runtime behavior inconsis= tent?

Just to make sure I understand the problem:

You have a s= pecific collection of records stored in Kafka. You run your pipeline, and o= bserve duplicate elements. Is that accurate?

Have you confirmed that= you're getting duplicate records via other library transforms (such as= applying Count.gl= obally() to kafkarecords)= ?

Additionally, I'm not sure what yo= u mean by "executes till a record lands on method"
=
Additionally additionally, is this reproducible if you execute with the = DirectRunner?
=

=

On Sun, Aug 7= , 2016 at 11:44 PM, amir bahmanyari <ami= rtousa@yahoo.com> wrote:
Hi Colleagues,<= /div>
I r= efrained from posting this email before completing thorough testing.
<= div dir=3D"ltr" id=3D"m_-105207906128589263yiv8746533902m_-7135387701050798= 741yiv9997091429m_-5331451614461944584yui_3_16_0_ym19_1_1470160531947_18184= 3">I think I did.
My core code works perfect & produces the ex= pect result every single time without wrapping it with Beam KafkaIO to rece= ive the data.
Without KafkaIO, it receives the records from a flat= data file. I repeated it and it always produced the right result.
With including a Beam KarkaIO and embedding exact same code in a anonymous= class running Beam pipelines, I get a different result every time I rerun = it.
Below is the snippet from where KafkaIO executes till a record= lands on method.
Kafka sends precise number of records. No duplic= ates. all good.
While executing in Beam, when the records are fini= shed & I expect a correct result, it always produces something differen= t.=C2=A0
Different in different runs.
I appreciate sheddi= ng light on this issue.=C2=A0 And thanks for your valuable time as always.<= /div>
Amir-

public static synchron= ized void main(String[] args) throws Exception {
// Create Beam Options for the Flink Runner.=
FlinkPipelineOptions options =3D= PipelineOptionsFactory.as( FlinkPipelineOptions.class);
// Set the Streaming engine as FlinkRunner
options.setRunner( FlinkPipelineRunner.c= lass);
// This is a Streaming pro= cess (as opposed to Batch=3Dfalse)
options.setStreaming(true);
//C= reate the DAG pipeline for parallel processing of independent LR records
Pipeline p =3D Pipeline.create(opti= ons);
//Kafka broker topic is ide= ntified as "lroad"=C2=A0
List<String> topics =3D Arrays.asList("lroad");
=
PCollection<String> kafkarecords =3D p.a= pply(KafkaIO.read(). withBootstrapServers(" kafkahost:9092").with= Topics( topics).
withValueCode= r( StringUtf8Coder.of()). withoutMetadata()).apply( Values.<String>cr= eate()). apply(Window.<String>into( FixedWindows.of(Duration. standar= dMinutes(1)))
=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0.triggering(AfterWatermark. pastEndOfWindow()). withAllowe= dLateness(Duration. ZERO)
=C2= =A0 =C2=A0.accumulatingFiredPanes());
=C2=A0 =C2=A0 =C2=A0 =C2=A0
=C2=A0 =C2=A0 kafkarecords.apply= (ParDo. named("ProcessLRKafkaData"). of(new DoFn<String, Strin= g>() {
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0= =C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2= =A0=C2=A0=C2=A0=C2=A0public void processElement(ProcessContext ctx) throws = Exception {

=C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 My core logic code here.=
}));
.
.
p.run(); /= / Start Beam Pipeline(s) in FlinkC Cluster
} //= of main
}// of class







--94eb2c0565c8f53e980539973f66--