flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Unit tests failing, losing stream contents
Date Fri, 12 Aug 2016 09:39:16 GMT
Hi David!

I would guess that the first exception happens once in a while, as part of
a rare race condition. As Max said, two executions happen simultaneously.
We should fix that race condition, though.

The second exception looks like it is purely part of your application code.

Greetings,
Stephan

On Fri, Aug 12, 2016 at 11:37 AM, Maximilian Michels <mxm@apache.org> wrote:

> Hi David,
>
> You're starting two executions at the same time (in different
> threads). Here's why:
>
> Execution No 1
> DataStreamUtils.collect(..) starts a Thread which executes your job
> and collects stream elements. It runs asynchronously. The collect(..)
> method returns after starting the thread.
>
> Execution No 2
> env.execute() also executes your job.
>
> Now these two race against each other causing all kinds of strange
> behavior.
>
> In general, the use of DataStreamUtils is discouraged. You should
> rather define a sink to write your data into a file. Or, you directly
> verify behavior as part of your Flink job (e.g. in a map function).
>
> Cheers,
> Max
>
> On Thu, Aug 11, 2016 at 5:47 PM, Ciar, David B. <dciar86@ceh.ac.uk> wrote:
> > Hi everyone,
> >
> >
> > I've been trying to write unit tests for my data stream bolts (map,
> flatMap,
> > apply etc.), however the results I've been getting are strange.  The code
> > for testing is here (running with scalatest and sbt):
> >
> >
> > https://gist.github.com/dbciar/7469adfea9e6442cdc9568aed07095ff
> >
> >
> > It runs the stream process environment once for each check, and for one
> of
> > the checks (output below) I get an "IllegalStateException: Factory has
> > already been initialized" which I'm not sure of the cause, while for the
> > rest I get an IndexOutOfboundsException.
> >
> >
> > The index exception is strange, as the index positions refer to the same
> > number of input tuples to the stream, so it is as if some are being lost,
> > or, the assert is running before the stream has completed processing and
> > adding objects to rawStreamOutput: Iterator[RawObservation] object.
> >
> >
> > Any pointers on what might be happening would be appreciated.  Also if
> > anyone has suggestions on how to incorporate the redis server in this
> check,
> > as I had to comment out the server definition here and run it separately
> to
> > get to the current position.
> >
> >
> > The two types of exception are first this:
> >
> >
> > [info] - Do well-formed observations parse OK? *** FAILED ***
> > [info]   java.lang.IllegalStateException: Factory has already been
> > initialized
> > [info]   at
> > org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(
> MemorySegmentFactory.java:132)
> > [info]   at
> > org.apache.flink.runtime.taskmanager.TaskManager$.
> parseTaskManagerConfiguration(TaskManager.scala:2055)
> > [info]   at
> > org.apache.flink.runtime.taskmanager.TaskManager$.
> startTaskManagerComponentsAndActor(TaskManager.scala:1802)
> > [info]   at
> > org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.
> startTaskManager(LocalFlinkMiniCluster.scala:142)
> > [info]   at
> > org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(
> FlinkMiniCluster.scala:319)
> > [info]   at
> > org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(
> FlinkMiniCluster.scala:312)
> > [info]   at
> > scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> > [info]   at
> > scala.collection.TraversableLike$$anonfun$map$
> 1.apply(TraversableLike.scala:234)
> > [info]   at scala.collection.immutable.Range.foreach(Range.scala:160)
> > [info]   at
> > scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> >
> > The rest are as follows, with the index being the position in the
> > rawStreamOutput Iterator[RawObservation] object I expect:
> >
> >
> > [info]   java.lang.IndexOutOfBoundsException: 3
> > [info]   at
> > scala.collection.immutable.Vector.checkRangeConvert(Vector.scala:132)
> > [info]   at scala.collection.immutable.Vector.apply(Vector.scala:122)
> > [info]   at
> > ProcessingBoltTest$$anonfun$5$$anon$7.<init>(
> ProcessingBoltTest.scala:93)
> > [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.
> scala:91)
> > [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.
> scala:91)
> > [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> > [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> > [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> > [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> > [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.
> scala:186)
> >
> >
> >
> > Thanks,
> >
> > David
> >
> >
> > ________________________________
> > This message (and any attachments) is for the recipient only. NERC is
> > subject to the Freedom of Information Act 2000 and the contents of this
> > email and any reply you make may be disclosed by NERC unless it is exempt
> > from release under the Act. Any material supplied to NERC may be stored
> in
> > an electronic records management system.
> > ________________________________
>

Mime
View raw message