beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Groh <tg...@google.com>
Subject Re: Is Beam pipeline runtime behavior inconsistent?
Date Mon, 08 Aug 2016 22:50:57 GMT
You would performance no better than single-threaded behavior if you group
everything into a single key, hence why this approach is strongly not
recommended. You can still get continuous output, depending on the
triggering, but you lose all of scaling benefits of running a pipeline as
opposed to a simple Java program, plus may incur some additional overhead.

To enforce this sort of threading you would do something among the lines of:

kafkarecords.apply(WithKeys.<Integer, String>of(1))
    .apply(GroupByKey.<Integer, String>create())
    .apply(Values.<Iterable<String>>create())
    .apply(new DoFn<Iterable<String>, String>() {...});

Where the DoFn unrolls its input and on each element applies the processing.


On Mon, Aug 8, 2016 at 2:37 PM, amir bahmanyari <amirtousa@yahoo.com> wrote:

> Thanks so much Thomas.
> Fantastic answer & great learning about whats really going on underneath
> the hood.
> Have a question on your suggestion: "To do so, you would key the inputs
> to a single static key and apply a GroupByKey, running the processing
> method on the output Iterable produced by the GroupByKey"...
> Wouldn't doing such defeats the "real-time Streaming" objectives?
> To me the above leads to a simulation of a simple single threaded java
> process but its executing in a massively parallel infrastructure in
>  a"fancy" way :-)
> Is there an example that demonstrates how to actually implement your
> suggestion above without any hidden loopholes pls?
> I can at least try it and see how far it gets for R&D purposes & share the
> results with the community.
> Cheers+have a wonderful day.
>
> ------------------------------
> *From:* Thomas Groh <tgroh@google.com>
> *To:* user@beam.incubator.apache.org; amir bahmanyari <amirtousa@yahoo.com>
>
> *Sent:* Monday, August 8, 2016 1:44 PM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> There's no way to guarantee that exactly one record is processed at a
> time. This is part of the design of ParDo to work efficiently across
> multiple processes and machines[1], where multiple instances of a DoFn must
> exist in order for progress to be made in a timely fashion. This includes
> processing the same element across multiple machines at the same time, with
> only one of the results being available in the output PCollection, as well
> as retries of failed elements.
>
> A runner is required to interact with a DoFn instance in a single-threaded
> manner - however, it is permitted to have multiple different DoFn instances
> active within a single process and across processes at any given time (for
> the same reasons as above). There's no support in the Beam model to
> restrict this type of execution. We do not encourage sharing objects
> between DoFn instances, and any shared state must be accessed in a
> thread-safe manner, and modifications to shared state should be idempotent,
> as otherwise retries and speculative execution may cause that state to be
> inconsistent. A DoFn will be reused for multiple elements across a single
> bundle, and may be reused across multiple bundles - if you require the DoFn
> to be "fresh" per element, it should perform any required setup at the
> start of the ProcessElement method.
>
> The best that can be done if it is absolutely required to restrict
> processing to a single element at a time would be to group all of the
> elements to a single key. Note that this will not solve the problem in all
> cases, as a runner is permitted to execute the group of elements multiple
> times so long as it only takes one completed bundle as the result, and
> additionally this removes the ability of the runner to balance work and
> introduces a performance bottleneck. To do so, you would key the inputs to
> a single static key and apply a GroupByKey, running the processing method
> on the output Iterable produced by the GroupByKey (directly; expanding the
> input iterable in a separate PCollection allows a runner to rebalance the
> elements, which will reintroduce parallelism)`.
>
> [1] https://github.com/apache/ incubator-beam/blob/master/
> sdks/java/core/src/main/java/ org/apache/beam/sdk/
> transforms/ParDo.java#L360
> <https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java#L360>
>
> On Mon, Aug 8, 2016 at 12:46 PM, amir bahmanyari <amirtousa@yahoo.com>
> wrote:
>
> Hi Thomas,
> Thanks so much for your response. Here are answers to your questions.
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> ==>> I send records to Kafka from my laptop. I use KafkaIO() to receive
> the records. I have confirmed that I dont get duplicates from Kafka.
> However,
> for some reason, certain parts of my code execute beyond the actual number
> of expected number of records, and subsequently produce extra resulting
> data.
> I tried playing with the Triggering. Stretching the window interval,
> DiscardingFiredPanes etc. all kinds of modes.
> Same.  How can I guarantee that one record at a time executes in one
> unique instance of the inner class object?
> I have all the shared objects synchronized and am using Java concurrent
> hashmaps. How can I guarantee synchronized operations amongst "parallel
> pipelines"? Analogous to multiple threads accessing a shared object and
> trying to modify it...
>
> Here is my current KafkaIO() call:
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .discardingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {.//I expect one record at a time to one object here
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to k afkarecords)?
> ==>>No duplicates from Kafka.
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
> ==>>Sorry for my confusing statement. Like I mentioned above, I expect
> each record coming from Kafka gets assigned to one instance of the inner
> class and therefore one instance of the pipeline executed it in parallel
> with others executing their own unique records.
>
> ------------------------------ ------------------------------
> ------------------------------ ------------------------------
> -----------------------
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
> ==>>I have not tried DirectRunner. Should I?
>
> Thanks so much Thomas.
>
>
> ------------------------------
> *From:* Thomas Groh <tgroh@google.com>
> *To:* user@beam.incubator.apache.org ; amir bahmanyari <
> amirtousa@yahoo.com>
> *Sent:* Monday, August 8, 2016 11:43 AM
> *Subject:* Re: Is Beam pipeline runtime behavior inconsistent?
>
> Just to make sure I understand the problem:
>
> You have a specific collection of records stored in Kafka. You run your
> pipeline, and observe duplicate elements. Is that accurate?
>
> Have you confirmed that you're getting duplicate records via other library
> transforms (such as applying Count.globally() to kafkarecords)?
>
> Additionally, I'm not sure what you mean by "executes till a record lands
> on method"
>
> Additionally additionally, is this reproducible if you execute with the
> DirectRunner?
>
>
> On Sun, Aug 7, 2016 at 11:44 PM, amir bahmanyari <amirtousa@yahoo.com>
> wrote:
>
> Hi Colleagues,
> I refrained from posting this email before completing thorough testing.
> I think I did.
> My core code works perfect & produces the expect result every single time
> without wrapping it with Beam KafkaIO to receive the data.
> Without KafkaIO, it receives the records from a flat data file. I repeated
> it and it always produced the right result.
> With including a Beam KarkaIO and embedding exact same code in a anonymous
> class running Beam pipelines, I get a different result every time I rerun
> it.
> Below is the snippet from where KafkaIO executes till a record lands on
> method.
> Kafka sends precise number of records. No duplicates. all good.
> While executing in Beam, when the records are finished & I expect a
> correct result, it always produces something different.
> Different in different runs.
> I appreciate shedding light on this issue.  And thanks for your valuable
> time as always.
> Amir-
>
> public static synchronized void main(String[] args) throws Exception {
>
> // Create Beam Options for the Flink Runner.
> FlinkPipelineOptions options = PipelineOptionsFactory.as(
> FlinkPipelineOptions.class);
> // Set the Streaming engine as FlinkRunner
> options.setRunner( FlinkPipelineRunner.class);
> // This is a Streaming process (as opposed to Batch=false)
> options.setStreaming(true);
> //Create the DAG pipeline for parallel processing of independent LR records
> Pipeline p = Pipeline.create(options);
> //Kafka broker topic is identified as "lroad"
> List<String> topics = Arrays.asList("lroad");
>
> PCollection<String> kafkarecords = p.apply(KafkaIO.read().
> withBootstrapServers(" kafkahost:9092").withTopics( topics).
> withValueCoder( StringUtf8Coder.of()). withoutMetadata()).apply(
> Values.<String>create()). apply(Window.<String>into(
> FixedWindows.of(Duration. standardMinutes(1)))
>          .triggering(AfterWatermark. pastEndOfWindow()).
> withAllowedLateness(Duration. ZERO)
>    .accumulatingFiredPanes());
>
>     kafkarecords.apply(ParDo. named("ProcessLRKafkaData"). of(new
> DoFn<String, String>() {
>
>                         public void processElement(ProcessContext ctx)
> throws Exception {
>
>                                         *My core logic code here.*
> }));
> .
> .
> p.run(); // Start Beam Pipeline(s) in FlinkC Cluster
> } // of main
> }// of class
>
>
>
>
>
>
>
>

Mime
View raw message