flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vinay Patil <vinay18.pa...@gmail.com>
Subject Re: Junit Issue while testing Kafka Source
Date Fri, 27 May 2016 03:11:48 GMT
Yeah understood.
Thank you for helping guys.

Regards,
Vinay Patil

*+91-800-728-4749*

On Thu, May 26, 2016 at 5:40 PM, Stephan Ewen <sewen@apache.org> wrote:

> The SuccessException does not really have a dependency.
>
> It is just a special Exception class that you throw in your code when you
> want to stop.
> The code that calls "env.execute()" catches the exception and checks
> whether the failure cause is that special exceptions.
> Flink propagates the exceptions from the workers back to the client.
>
> Greetings,
> Step
>
> On Thu, May 26, 2016 at 12:37 PM, Vinay Patil <vinay18.patil@gmail.com>
> wrote:
>
> > Hi Stephan,
> >
> > Yes using DeserializationSchema solution will definitely work.
> > I am not able to get the dependency for SuccessException.
> > Any help on this
> >
> > Regards,
> > Vinay Patil
> >
> > *+91-800-728-4749*
> >
> > On Thu, May 26, 2016 at 3:32 PM, Stephan Ewen <sewen@apache.org> wrote:
> >
> > > Hi!
> > >
> > > On Flink 1.0, there is the "flink-test-utils_2.10" dependency that has
> a
> > > some useful things.
> > >
> > > The "SuccessException" seems a quite common thing - I have seen that in
> > > other infinite program tests as well (Google Dataflow / Beam)
> > >
> > > Another way you can architect tests is to have an element in the stream
> > > that signals end-of-stream. The DeserializationSchema can check for
> that
> > > and return "end of stream".
> > >
> > > Greetings,
> > > Stephan
> > >
> > >
> > >
> > > On Thu, May 26, 2016 at 11:55 AM, Vinay Patil <vinay18.patil@gmail.com
> >
> > > wrote:
> > >
> > > > Hi Aljoscha,
> > > >
> > > > Thank you for answering.
> > > > Throwing SuccessException is a good idea , however when I am adding
> > > > following dependency, no classes are getting added to the jar:
> > > >
> > > >                <dependency>
> > > > <groupId>org.apache.flink</groupId>
> > > > <artifactId>flink-tests_2.10</artifactId>
> > > > <version>1.0.3</version>
> > > > </dependency>
> > > >
> > > > Is there any other dependency that I have to add ? I have also added
> > > > test-utils dependency.
> > > >
> > > > I am trying the following in my test case :
> > > > 1) Consuming data from Kafka using FlinkKafkaConsumer and passing it
> to
> > > map
> > > > as Tuple2
> > > > 2) In the map function I am just checking if Tuple2 contains data, if
> > > yes,
> > > > throw the exception("success")
> > > > 3) This way I am verifying that the configuration is correct and that
> > we
> > > > are able to read from kafka.
> > > >
> > > > Am I doing it right, is there any better approach ?
> > > >
> > > > Regards,
> > > > Vinay Patil
> > > >
> > > > *+91-800-728-4749*
> > > >
> > > > On Thu, May 26, 2016 at 1:01 PM, Aljoscha Krettek <
> aljoscha@apache.org
> > >
> > > > wrote:
> > > >
> > > > > Hi,
> > > > > what we are doing in most internal tests is to verify in a sink
> > whether
> > > > the
> > > > > data is correct and then throw a SuccessException. This brings down
> > the
> > > > job
> > > > > and we check whether we catch a SuccessException to verify that the
> > > test
> > > > > was successful. Look, for example, at the ValidatingSink in
> > > > > EventTimeWindowCheckpointingITCase in the Flink source.
> > > > >
> > > > > Cheers,
> > > > > Aljoscha
> > > > >
> > > > > On Thu, 26 May 2016 at 01:58 Nick Dimiduk <ndimiduk@gmail.com>
> > wrote:
> > > > >
> > > > > > I'm also curious for a solution here. My test code executes
the
> > flow
> > > > > from a
> > > > > > separate thread. Once i've joined on all my producer threads
and
> > I've
> > > > > > verified the output, I simply interrupt the flow thread. This
> spews
> > > > > > exceptions, but it all appears to be harmless.
> > > > > >
> > > > > > Maybe there's a better way? I think you'd need some "death pill"
> to
> > > > send
> > > > > > into the stream that signals its termination.
> > > > > >
> > > > > > On Tue, May 24, 2016 at 7:29 PM, Vinay Patil <
> > > vinay18.patil@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I am able to read from a topic using FlinkKafkaConsumer
and
> > return
> > > > the
> > > > > > > result, however  when I am testing this scenario in Junit
the
> > > result
> > > > is
> > > > > > > getting printed(kafkaStream.print()) but  I am not able
to exit
> > the
> > > > > Job,
> > > > > > > env.execute keeps running,
> > > > > > > I tried to return env.execute from method but that did
not work
> > > > either.
> > > > > > >
> > > > > > > 1) Is there any way to end the execution of job forcefully.
> > > > > > > 2) How do I test if the data has come from topic
> > > > > > >
> > > > > > >    - One way I think of is to get the output of stream.print()
> > in a
> > > > > > >    PrintStream and check the result.(but not able to test
this
> > > since
> > > > > job
> > > > > > is
> > > > > > >    not getting exited)
> > > > > > >
> > > > > > > Please help with these issues
> > > > > > >
> > > > > > > Regards,
> > > > > > > Vinay Patil
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message