beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Motty Gruda <mott...@gmail.com>
Subject Re: Handling errors in IOs
Date Sun, 11 Feb 2018 11:58:42 GMT
runner: spark runner (1.6.3)
beam: 2.2.0
activemq: 5.14.3

code:

Pipeline p =
Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

ConnectionFactory factory_a = new
ActiveMQConnectionFactory("tcp://activemq-a:61616");
ConnectionFactory factory_b = new
ActiveMQConnectionFactory("tcp://activemq-b:61616");
ConnectionFactory factory_c = new
ActiveMQConnectionFactory("tcp://activemq-c:61616");

PCollection<JmsRecord> a = p.apply("ReadFromQueue",
JmsIO.read().withConnectionFactory(factory_a).withUsername("admin").withPassword("admin")
.withQueue("a"));
PCollection<JmsRecord> b = p.apply("ReadFromQueue2",
JmsIO.read().withConnectionFactory(factory_b).withUsername("admin").withPassword("admin")
.withQueue("b"));

PCollection<JmsRecord> combined =
PCollectionList.of(a).and(b).apply(Flatten.pCollections());

combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord
r) -> r.getPayload()))
.apply("WriteToQueue",
JmsIO.write().withConnectionFactory(factory_c).withUsername("admin")
.withPassword("admin").withQueue("c"));

p.run().waitUntilFinish();

after killing activemq-a the job dies with the following stacktrace:

18/02/11 11:39:59 INFO CacheManager: Partition rdd_1569_0 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1565_0 not found,
computing it
18/02/11 11:39:59 INFO CoarseGrainedExecutorBackend: Got assigned task 924
18/02/11 11:39:59 INFO Executor: Running task 1.2 in stage 349.0 (TID 924)
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1561_0 not found,
computing it
18/02/11 11:39:59 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1673_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1557_0 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1669_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1553_0 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1665_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1549_0 not found,
computing it
18/02/11 11:39:59 INFO BlockManager: Found block rdd_1528_0 locally
18/02/11 11:39:59 INFO StateSpecFunctions: Continue reading from an
existing CheckpointMark.
18/02/11 11:39:59 ERROR Executor: Exception in task 0.1 in stage 349.0 (TID 923)
java.lang.RuntimeException: Failed to read from reader.
	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:204)
	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:105)
	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
	at org.apache.spark.scheduler.Task.run(Task.scala:89)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: java.lang.NullPointerException
	at org.apache.beam.sdk.io.jms.JmsIO$UnboundedJmsReader.advance(JmsIO.java:472)
	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:249)
	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
	... 148 more
Caused by: java.lang.NullPointerException
	at org.apache.beam.sdk.io.jms.JmsIO$UnboundedJmsReader.advance(JmsIO.java:436)
	... 151 more
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1661_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1657_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1653_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1649_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1645_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1641_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1637_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1633_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1629_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1625_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1621_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1617_1 not found,
computing it
18/02/11 11:39:59 INFO CacheManager: Partition rdd_1613_1 not found,
computing it



Trying to run the same code on spark 2.2.0 and beam 2.3.0-rc3 only half of
the message sent ended up in queue c!



On Sun, Feb 11, 2018 at 8:56 AM, Jean-Baptiste Onofré <jb@nanthrax.net>
wrote:

> Hi Motty,
>
> For JMS, it depends if you are using queues or topics.
>
> Using queues, JmsIO create several readers (concurrent consumer) on the
> same
> queue. The checkpoint used is based on the ACK (it's a client ACK, and the
> source send the ACK when the checkpoint is finalized). If you close a
> connection
> for one source, the other sources should continue to consume.
>
> Can you explain exactly your scenario (runner, pipeline, broker) ?
>
> Regards
> JB
>
> On 02/11/2018 07:43 AM, Motty Gruda wrote:
> > Hey,
> >
> > How errors in the IOs can be treated (for example connection errors)?
> >
> > I've tested few scenarios with the JmsIO. When I read from two different
> jms
> > connections and I closed only one of them, the entire pipeline
> failed/froze.
> > I would expect it to continue running with one source and try to
> reconnect to
> > the second source until it's available again.
> >
> > Is this a bug in the IO itself? In the SDK? In the runner (I've tested
> with the
> > direct runner and the spark runner)?
>
> --
> Jean-Baptiste Onofré
> jbonofre@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>



-- 
 מוטי גרודה
      Motty Gruda

Mime
View raw message