beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Baptiste Onofré ...@nanthrax.net>
Subject Re: Handling errors in IOs
Date Mon, 12 Feb 2018 03:30:12 GMT
Hi,

here you don't use split, but different JmsIO reading from different 
queues (not the same). The two reads are not related.

If you kill connection from one, you have to reconnect. That can be done 
by configuration on the ConnectionFactory.

Is it what you want ? Automatically reconnect ?

Regards
JB

On 11/02/2018 12:58, Motty Gruda wrote:
> runner: spark runner (1.6.3)
> beam: 2.2.0
> activemq: 5.14.3
> 
> code:
> 
> Pipeline p = 
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
> 
> ConnectionFactory factory_a = new 
> ActiveMQConnectionFactory("tcp://activemq-a:61616");
> ConnectionFactory factory_b = new 
> ActiveMQConnectionFactory("tcp://activemq-b:61616");
> ConnectionFactory factory_c = new 
> ActiveMQConnectionFactory("tcp://activemq-c:61616");
> 
> PCollection<JmsRecord> a = p.apply("ReadFromQueue",
> JmsIO.read().withConnectionFactory(factory_a).withUsername("admin").withPassword("admin")
> .withQueue("a"));
> PCollection<JmsRecord> b = p.apply("ReadFromQueue2",
> JmsIO.read().withConnectionFactory(factory_b).withUsername("admin").withPassword("admin")
> .withQueue("b"));
> 
> PCollection<JmsRecord> combined = 
> PCollectionList.of(a).and(b).apply(Flatten.pCollections());
> 
> combined.apply(MapElements.into(TypeDescriptors.strings()).via((JmsRecord r) 
> -> r.getPayload()))
> .apply("WriteToQueue", 
> JmsIO.write().withConnectionFactory(factory_c).withUsername("admin")
> .withPassword("admin").withQueue("c"));
> 
> p.run().waitUntilFinish();
> 
> after killing activemq-a the job dies with the following stacktrace:
> 
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1569_0 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1565_0 not found, computing it
> 18/02/11 11:39:59 INFO CoarseGrainedExecutorBackend: Got assigned task 924
> 18/02/11 11:39:59 INFO Executor: Running task 1.2 in stage 349.0 (TID 924)
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1561_0 not found, computing it
> 18/02/11 11:39:59 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1673_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1557_0 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1669_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1553_0 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1665_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1549_0 not found, computing it
> 18/02/11 11:39:59 INFO BlockManager: Found block rdd_1528_0 locally
> 18/02/11 11:39:59 INFO StateSpecFunctions: Continue reading from an existing CheckpointMark.
> 18/02/11 11:39:59 ERROR Executor: Exception in task 0.1 in stage 349.0 (TID 923)
> java.lang.RuntimeException: Failed to read from reader.
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:204)
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:105)
> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:180)
> 	at org.apache.spark.streaming.StateSpec$$anonfun$1.apply(StateSpec.scala:179)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:57)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$$anonfun$updateRecordWithData$1.apply(MapWithStateRDD.scala:55)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDDRecord$.updateRecordWithData(MapWithStateRDD.scala:55)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:155)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.streaming.rdd.MapWithStateRDD.compute(MapWithStateRDD.scala:149)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:89)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: java.lang.NullPointerException
> 	at org.apache.beam.sdk.io.jms.JmsIO$UnboundedJmsReader.advance(JmsIO.java:472)
> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.advanceWithBackoff(MicrobatchSource.java:249)
> 	at org.apache.beam.runners.spark.io.MicrobatchSource$Reader.start(MicrobatchSource.java:227)
> 	at org.apache.beam.runners.spark.stateful.StateSpecFunctions$1.apply(StateSpecFunctions.java:168)
> 	... 148 more
> Caused by: java.lang.NullPointerException
> 	at org.apache.beam.sdk.io.jms.JmsIO$UnboundedJmsReader.advance(JmsIO.java:436)
> 	... 151 more
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1661_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1657_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1653_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1649_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1645_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1641_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1637_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1633_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1629_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1625_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1621_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1617_1 not found, computing it
> 18/02/11 11:39:59 INFO CacheManager: Partition rdd_1613_1 not found, computing it
> 
> 
> 
> Trying to run the same code on spark 2.2.0 and beam 2.3.0-rc3 only half 
> of the message sent ended up in queue c!
> 
> 
> 
> On Sun, Feb 11, 2018 at 8:56 AM, Jean-Baptiste Onofré <jb@nanthrax.net 
> <mailto:jb@nanthrax.net>> wrote:
> 
>     Hi Motty,
> 
>     For JMS, it depends if you are using queues or topics.
> 
>     Using queues, JmsIO create several readers (concurrent consumer) on
>     the same
>     queue. The checkpoint used is based on the ACK (it's a client ACK,
>     and the
>     source send the ACK when the checkpoint is finalized). If you close
>     a connection
>     for one source, the other sources should continue to consume.
> 
>     Can you explain exactly your scenario (runner, pipeline, broker) ?
> 
>     Regards
>     JB
> 
>     On 02/11/2018 07:43 AM, Motty Gruda wrote:
>      > Hey,
>      >
>      > How errors in the IOs can be treated (for example connection
>     errors)?
>      >
>      > I've tested few scenarios with the JmsIO. When I read from two
>     different jms
>      > connections and I closed only one of them, the entire pipeline
>     failed/froze.
>      > I would expect it to continue running with one source and try to
>     reconnect to
>      > the second source until it's available again.
>      >
>      > Is this a bug in the IO itself? In the SDK? In the runner (I've
>     tested with the
>      > direct runner and the spark runner)?
> 
>     --
>     Jean-Baptiste Onofré
>     jbonofre@apache.org <mailto:jbonofre@apache.org>
>     http://blog.nanthrax.net
>     Talend - http://www.talend.com
> 
> 
> 
> 
> -- 
>   מוטי גרודה                                                             
>            Motty Gruda

Mime
View raw message