beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Groh <tg...@google.com.INVALID>
Subject Re: KafkaIO Windowing Fn
Date Thu, 01 Sep 2016 02:09:07 GMT
In 0.2.0-incubating and beyond we've replaced the DirectPipelineRunner with
the DirectRunner (formerly InProcessPipelineRunner), which is capable of
handling Unbounded Pipelines. Is it possible for you to upgrade?

On Wed, Aug 31, 2016 at 5:17 PM, Chawla,Sumit <sumitkchawla@gmail.com>
wrote:

> @Ajioscha,  My assumption is here that atleast one trigger should fire.
> Either the 100 elements or the 30 second since first element. (whichever
> happens first)
>
> @Thomas - here is the error i get: I am using 0.1.0-incubating
>
> *ava.lang.IllegalStateException: no evaluator registered for
> Read(UnboundedKafkaSource)*
>
> * at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.
> visitPrimitiveTransform(DirectPipelineRunner.java:890)*
> * at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:225)*
> * at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:220)*
> * at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(
> TransformTreeNode.java:220)*
> * a*
>
> Regards
> Sumit Chawla
>
>
> On Wed, Aug 31, 2016 at 10:19 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
> > Hi,
> > could the reason for the second part of the trigger never firing be that
> > there are never at least 100 elements per key. The trigger would only
> fire
> > if it saw 100 elements and with only 540 elements that seems unlikely if
> > you have more than 6 keys.
> >
> > Cheers,
> > Aljoscha
> >
> > On Wed, 31 Aug 2016 at 17:47 Thomas Groh <tgroh@google.com.invalid>
> wrote:
> >
> > > KafkaIO is implemented using the UnboundedRead API, which is supported
> by
> > > the DirectRunner. You should be able to run without the
> > withMaxNumRecords;
> > > if you can't, I'd be very interested to see the stack trace that you
> get
> > > when you try to run the Pipeline.
> > >
> > > On Tue, Aug 30, 2016 at 11:24 PM, Chawla,Sumit <sumitkchawla@gmail.com
> >
> > > wrote:
> > >
> > > > Yes.  I added it only for DirectRunner as it cannot translate
> > > > Read(UnboundedSourceOfKafka)
> > > >
> > > > Regards
> > > > Sumit Chawla
> > > >
> > > >
> > > > On Tue, Aug 30, 2016 at 11:18 PM, Aljoscha Krettek <
> > aljoscha@apache.org>
> > > > wrote:
> > > >
> > > > > Ah ok, this might be a stupid question but did you remove this line
> > > when
> > > > > running it with Flink:
> > > > >         .withMaxNumRecords(500)
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Wed, 31 Aug 2016 at 08:14 Chawla,Sumit <sumitkchawla@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hi Aljoscha
> > > > > >
> > > > > > The code is not different while running on Flink.  It have
> removed
> > > > > business
> > > > > > specific transformations only.
> > > > > >
> > > > > > Regards
> > > > > > Sumit Chawla
> > > > > >
> > > > > >
> > > > > > On Tue, Aug 30, 2016 at 4:49 AM, Aljoscha Krettek <
> > > aljoscha@apache.org
> > > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > could you maybe also post the complete that you're using
with
> the
> > > > > > > FlinkRunner? I could have a look into it.
> > > > > > >
> > > > > > > Cheers,
> > > > > > > Aljoscha
> > > > > > >
> > > > > > > On Tue, 30 Aug 2016 at 09:01 Chawla,Sumit <
> > sumitkchawla@gmail.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Thomas
> > > > > > > >
> > > > > > > > Sorry i tried with DirectRunner but ran into some
kafka
> issues.
> > > > > > > Following
> > > > > > > > is the snippet i am working on, and will post more
details
> > once i
> > > > get
> > > > > > it
> > > > > > > > working ( as of now i am unable to read messages from
Kafka
> > using
> > > > > > > > DirectRunner)
> > > > > > > >
> > > > > > > >
> > > > > > > > PipelineOptions pipelineOptions =
> > > PipelineOptionsFactory.create();
> > > > > > > > pipelineOptions.setRunner(DirectPipelineRunner.class);
> > > > > > > > Pipeline pipeline = Pipeline.create(pipelineOptions);
> > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > >         .withMaxNumRecords(500)
> > > > > > > >         .withTopics(ImmutableList.of("mytopic"))
> > > > > > > >         .withBootstrapServers("localhost:9092")
> > > > > > > >         .updateConsumerProperties(ImmutableMap.of(
> > > > > > > >                 ConsumerConfig.GROUP_ID_CONFIG, "test1",
> > > > > > > >                 ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> > "true",
> > > > > > > >                 ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> > > > "earliest"
> > > > > > > >         ))).apply(ParDo.of(new DoFn<KafkaRecord<byte[],
> > byte[]>,
> > > > > > > > KV<String, String>>() {
> > > > > > > >     @Override
> > > > > > > >     public void processElement(ProcessContext c) throws
> > > Exception {
> > > > > > > >         KV<byte[], byte[]> record = c.element().getKV();
> > > > > > > >         c.output(KV.of(new String(record.getKey()),
new
> > > > > > > > String(record.getValue())));
> > > > > > > >     }
> > > > > > > > }))
> > > > > > > >         .apply("WindowByMinute", Window.<KV<String,
> > > > > > > > String>>into(FixedWindows.of(Duration.standardSeconds(10)))
> > > > > > > >                 .withAllowedLateness(Duration.
> > standardSeconds(1))
> > > > > > > >                 .triggering(
> > > > > > > >                         Repeatedly.forever(
> > > > > > > >                                 AfterFirst.of(
> > > > > > > >
> > > > > > > > AfterProcessingTime.pastFirstElementInPane()
> > > > > > > >
> > > > > > > > .plusDelayOf(Duration.standardSeconds(30)),
> > > > > > > >
> > > > >  AfterPane.elementCountAtLeast(
> > > > > > > 100)
> > > > > > > >                                 )))
> > > > > > > >                 .discardingFiredPanes())
> > > > > > > >         .apply("GroupByTenant", GroupByKey.create())
> > > > > > > >         .apply(ParDo.of(new DoFn<KV<String,
> Iterable<String>>,
> > > > > Void>()
> > > > > > {
> > > > > > > >             @Override
> > > > > > > >             public void processElement(ProcessContext
c)
> throws
> > > > > > > Exception {
> > > > > > > >                 KV<String, Iterable<String>>
element =
> > > c.element();
> > > > > > > >                 Iterator<String> iterator =
> > > > > > > element.getValue().iterator();
> > > > > > > >                 int count = 0;
> > > > > > > >                 while (iterator.hasNext()) {
> > > > > > > >                     iterator.next();
> > > > > > > >                     count++;
> > > > > > > >                 }
> > > > > > > >                 System.out.println(String.format("Key
%s
> Value
> > > > Count
> > > > > > > > %d", element.getKey(), count));
> > > > > > > >             }
> > > > > > > >         }));
> > > > > > > > pipeline.run();
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > Regards
> > > > > > > > Sumit Chawla
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Aug 26, 2016 at 9:46 AM, Thomas Groh
> > > > > <tgroh@google.com.invalid
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > If you use the DirectRunner, do you observe the
same
> > behavior?
> > > > > > > > >
> > > > > > > > > On Thu, Aug 25, 2016 at 4:32 PM, Chawla,Sumit
<
> > > > > > sumitkchawla@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Thomas
> > > > > > > > > >
> > > > > > > > > > I am using FlinkRunner.  Yes the second
part of trigger
> > never
> > > > > fires
> > > > > > > for
> > > > > > > > > me,
> > > > > > > > > >
> > > > > > > > > > Regards
> > > > > > > > > > Sumit Chawla
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Thu, Aug 25, 2016 at 4:18 PM, Thomas
Groh
> > > > > > > <tgroh@google.com.invalid
> > > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hey Sumit;
> > > > > > > > > > >
> > > > > > > > > > > What runner are you using? I can set
up a test with the
> > > same
> > > > > > > trigger
> > > > > > > > > > > reading from an unbounded input using
the DirectRunner
> > and
> > > I
> > > > > get
> > > > > > > the
> > > > > > > > > > > expected output panes.
> > > > > > > > > > >
> > > > > > > > > > > Just to clarify, the second half of
the trigger ('when
> > the
> > > > > first
> > > > > > > > > element
> > > > > > > > > > > has been there for at least 30+ seconds')
simply never
> > > fires?
> > > > > > > > > > >
> > > > > > > > > > > On Thu, Aug 25, 2016 at 2:38 PM, Chawla,Sumit
<
> > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Thomas
> > > > > > > > > > > >
> > > > > > > > > > > > That did not work.
> > > > > > > > > > > >
> > > > > > > > > > > > I tried following instead:
> > > > > > > > > > > >
> > > > > > > > > > > > .triggering(
> > > > > > > > > > > >         Repeatedly.forever(
> > > > > > > > > > > >                 AfterFirst.of(
> > > > > > > > > > > >                              
AfterProcessingTime.
> > > > > > > > > > > pastFirstElementInPane()
> > > > > > > > > > > >
> > > > >  .plusDelayOf(Duration.standard
> > > > > > > > > > > > Seconds(30)),
> > > > > > > > > > > >
> > > > > >  AfterPane.elementCountAtLeast(100)
> > > > > > > > > > > >                         )))
> > > > > > > > > > > > .discardingFiredPanes()
> > > > > > > > > > > >
> > > > > > > > > > > > What i am trying to do here. 
This is to make sure
> that
> > > > > > followup
> > > > > > > > > > > > operations receive batches of
records.
> > > > > > > > > > > >
> > > > > > > > > > > > 1.  Fire when at Pane has 100+
elements
> > > > > > > > > > > >
> > > > > > > > > > > > 2.  Or Fire when the first element
has been there for
> > > > atleast
> > > > > > 30
> > > > > > > > > sec+.
> > > > > > > > > > > >
> > > > > > > > > > > > However,  2 point does not seem
to work.  e.g. I have
> > 540
> > > > > > records
> > > > > > > > in
> > > > > > > > > > > > Kafka.  The first 500 records
are available
> > immediately,
> > > > > > > > > > > >
> > > > > > > > > > > > but the remaining 40 don't pass
through. I was
> > expecting
> > > > 2nd
> > > > > to
> > > > > > > > > > > > trigger to help here.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Regards
> > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Thu, Aug 25, 2016 at 1:13 PM,
Thomas Groh
> > > > > > > > > <tgroh@google.com.invalid
> > > > > > > > > > >
> > > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > You can adjust the trigger
in the windowing
> transform
> > > if
> > > > > your
> > > > > > > > sink
> > > > > > > > > > can
> > > > > > > > > > > > > handle being written to multiple
times for the same
> > > > window.
> > > > > > For
> > > > > > > > > > > example,
> > > > > > > > > > > > if
> > > > > > > > > > > > > the sink appends to the output
when it receives new
> > > data
> > > > > in a
> > > > > > > > > window,
> > > > > > > > > > > you
> > > > > > > > > > > > > could add something like
> > > > > > > > > > > > >
> > > > > > > > > > > > > Window.into(...).withAllowedLateness(...).
> > > > > > > > > triggering(AfterWatermark.
> > > > > > > > > > > > > pastEndOfWindow().withEarlyFirings(
> > AfterProcessingTime.
> > > > > > > > > > > > > pastFirstElementInPane().withDelayOf(Duration.
> > > > > > > > > standardSeconds(5))).
> > > > > > > > > > > > > withLateFirings(AfterPane.
> elementCountAtLeast(1))).
> > > > > discardin
> > > > > > > > > > > > gFiredPanes();
> > > > > > > > > > > > >
> > > > > > > > > > > > > This will cause elements
to be output some amount
> of
> > > time
> > > > > > after
> > > > > > > > > they
> > > > > > > > > > > are
> > > > > > > > > > > > > first received from Kafka,
even if Kafka does not
> > have
> > > > any
> > > > > > new
> > > > > > > > > > > elements.
> > > > > > > > > > > > > Elements will only be output
by the GroupByKey
> once.
> > > > > > > > > > > > >
> > > > > > > > > > > > > We should still have a JIRA
to improve the KafkaIO
> > > > > watermark
> > > > > > > > > tracking
> > > > > > > > > > > in
> > > > > > > > > > > > > the absence of new records
.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Thu, Aug 25, 2016 at 10:29
AM, Chawla,Sumit <
> > > > > > > > > > sumitkchawla@gmail.com
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks Raghu.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I don't have much control
over changing KafkaIO
> > > > > properties.
> > > > > > > I
> > > > > > > > > > added
> > > > > > > > > > > > > > KafkaIO code for completing
the example.  Are
> there
> > > any
> > > > > > > changes
> > > > > > > > > > that
> > > > > > > > > > > > can
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > done to Windowing to
achieve the same behavior?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Aug 24, 2016
at 5:06 PM, Raghu Angadi
> > > > > > > > > > > > <rangadi@google.com.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The default implementation
returns processing
> > > > timestamp
> > > > > > of
> > > > > > > > the
> > > > > > > > > > last
> > > > > > > > > > > > > > record
> > > > > > > > > > > > > > > (in effect. more
accurately it returns same as
> > > > > > > > getTimestamp(),
> > > > > > > > > > > which
> > > > > > > > > > > > > > might
> > > > > > > > > > > > > > > overridden by user).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > As a work around,
yes, you can provide your own
> > > > > > watermarkFn
> > > > > > > > > that
> > > > > > > > > > > > > > > essentially returns
Now() or Now()-1sec. (usage
> > in
> > > > > > javadoc
> > > > > > > > > > > > > > > <https://github.com/apache/
> > > > incubator-beam/blob/master/
> > > > > > > > > > > > > > > sdks/java/io/kafka/src/main/
> > > > > java/org/apache/beam/sdk/io/
> > > > > > > > > > > > > > > kafka/KafkaIO.java#L138>
> > > > > > > > > > > > > > > )
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I think default
watermark should be smarter. it
> > > > should
> > > > > > > > advance
> > > > > > > > > to
> > > > > > > > > > > > > current
> > > > > > > > > > > > > > > time if there aren't
any records to read from
> > > Kafka.
> > > > > > Could
> > > > > > > > you
> > > > > > > > > > > file a
> > > > > > > > > > > > > > jira?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > thanks,
> > > > > > > > > > > > > > > Raghu.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Wed, Aug 24,
2016 at 2:10 PM, Chawla,Sumit <
> > > > > > > > > > > > sumitkchawla@gmail.com>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi All
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I am trying
to do some simple batch
> processing
> > on
> > > > > > KafkaIO
> > > > > > > > > > > records.
> > > > > > > > > > > > > My
> > > > > > > > > > > > > > > beam
> > > > > > > > > > > > > > > > pipeline looks
like following:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > pipeline.apply(KafkaIO.read()
> > > > > > > > > > > > > > > >         .withTopics(ImmutableList.of(
> > s"mytopic"))
> > > > > > > > > > > > > > > >         .withBootstrapServers("
> > localhost:9200")
> > > > > > > > > > > > > > > > .apply("ExtractMessage",
ParDo.of(new
> > > > > > > ExtractKVMessage()))
> > > > > > > > //
> > > > > > > > > > > > Emits a
> > > > > > > > > > > > > > > > KV<String,String>
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > .apply("WindowBy10Sec",
Window.<KV<String,
> > > > > > > > > > > > > > > > JSONObject>>into(FixedWindows.
> > > > > > > of(Duration.standardSeconds(
> > > > > > > > > > > > > > > > 10))).withAllowedLateness(
> > > > > Duration.standardSeconds(1)))
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > .apply("GroupByKey",
GroupByKey.create())
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > .apply("Sink",
ParDo.of(new MySink())
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > My Kafka Source
already has some messages
> > 1000+,
> > > > and
> > > > > > new
> > > > > > > > > > messages
> > > > > > > > > > > > > > arrive
> > > > > > > > > > > > > > > > every few
minutes.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > When i start
my pipeline,  i can see that it
> > > reads
> > > > > all
> > > > > > > the
> > > > > > > > > > 1000+
> > > > > > > > > > > > > > messages
> > > > > > > > > > > > > > > > from Kafka.
 However, Window does not fire
> > > untill a
> > > > > new
> > > > > > > > > message
> > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > Kafka.  And
Sink does not receive any message
> > > until
> > > > > > that
> > > > > > > > > point.
> > > > > > > > > > > > Do i
> > > > > > > > > > > > > > > need
> > > > > > > > > > > > > > > > to override
the WaterMarkFn here? Since i am
> > not
> > > > > > > providing
> > > > > > > > > any
> > > > > > > > > > > > > > > timeStampFn
> > > > > > > > > > > > > > > > , i am assuming
that timestamps will be
> > assigned
> > > as
> > > > > in
> > > > > > > when
> > > > > > > > > > > message
> > > > > > > > > > > > > > > arrives
> > > > > > > > > > > > > > > > i.e. ingestion
time.  What is the default
> > > > WaterMarkFn
> > > > > > > > > > > > implementation?
> > > > > > > > > > > > > > Is
> > > > > > > > > > > > > > > > the Window
not supposed to be fired based on
> > > > > Ingestion
> > > > > > > > time?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Regards
> > > > > > > > > > > > > > > > Sumit Chawla
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message