nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <marka...@hotmail.com>
Subject RE: NiFi-Spark receiver not serializable
Date Fri, 04 Sep 2015 16:19:48 GMT

Michael,

It looks like it is actually not the Runnable that needs to be receivable but rather the 
inner class of the Runnable (referenced as Runnable$1 here). This is the implementation
of the NiFiDataPacket interface. I have created a ticket to address this:

https://issues.apache.org/jira/browse/NIFI-927

Thanks
-Mark

________________________________
> From: michael.griffiths3@baesystems.com 
> To: users@nifi.apache.org 
> Subject: NiFi-Spark receiver not serializable 
> Date: Fri, 4 Sep 2015 16:02:10 +0000 
> 
> 
> Hi all, 
> 
> 
> 
> I keep having an odd error that happens frequently (but not always, a 
> Spark job can run for ~10 minutes before this is thrown). The 
> NiFiReceiver’s Runnable is unable to be serialized due to it being 
> modified at the same time as it is being serialized. The full stack 
> trace is below. I’ve looked through the NiFiReceiver code, including 
> taking a copy and making the Runnable extend serializable but that did 
> not solve it either. 
> 
> 
> 
> Has anyone seen this before? To be frank, I’m unsure whether it’s the 
> receiver or my own code affecting the receiver. 
> 
> 
> 
> Many thanks, 
> 
> 
> 
> Michael 
> 
> 
> 
> 15/09/04 16:55:39 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 
> 48.0 (TID 411, slave6.localdomain): java.lang.RuntimeException: 
> com.esotericsoftware.kryo.KryoException: 
> java.util.ConcurrentModificationException 
> 
> Serialization trace: 
> 
> classes (sun.misc.Launcher$AppClassLoader) 
> 
> classloader (java.security.ProtectionDomain) 
> 
> cachedPDs (javax.security.auth.SubjectDomainCombiner) 
> 
> combiner (java.security.AccessControlContext) 
> 
> acc (org.apache.spark.util.MutableURLClassLoader) 
> 
> contextClassLoader (org.apache.spark.streaming.util.RecurringTimer$$anon$1) 
> 
> thread (org.apache.spark.streaming.util.RecurringTimer) 
> 
> blockIntervalTimer (org.apache.spark.streaming.receiver.BlockGenerator) 
> 
> blockGenerator (org.apache.spark.streaming.receiver.ReceiverSupervisorImpl) 
> 
> executor_ (org.apache.nifi.spark.NiFiReceiver) 
> 
> this$0 (org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable) 
> 
> this$1 (org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable$1) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
> 
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:79) 
> 
> at 
> com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17) 
> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)

> 
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
> 
> at 
> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:148)

> 
> at 
> org.apache.spark.serializer.SerializationStream.writeAll(Serializer.scala:153) 
> 
> at 
> org.apache.spark.storage.BlockManager.dataSerializeStream(BlockManager.scala:1189) 
> 
> at 
> org.apache.spark.storage.BlockManager.dataSerialize(BlockManager.scala:1198) 
> 
> at 
> org.apache.spark.storage.MemoryStore.getBytes(MemoryStore.scala:190) 
> 
> at 
> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:480) 
> 
> at 
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) 
> 
> at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)

> 
> at 
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)

> 
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
> 
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
> 
> at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
> 
> at 
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
> 
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
> 
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) 
> 
> at 
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)

> 
> at 
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114)

> 
> at 
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87)

> 
> at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101)

> 
> at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)

> 
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)

> 
> at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) 
> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)

> 
> at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)

> 
> at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)

> 
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)

> 
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)

> 
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
> 
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

> 
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
> 
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
> 
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)

> 
> at java.lang.Thread.run(Thread.java:745) 
> 
> Caused by: java.util.ConcurrentModificationException 
> 
> at java.util.Vector$Itr.checkForComodification(Vector.java:1184) 
> 
> at java.util.Vector$Itr.next(Vector.java:1137) 
> 
> at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:67)

> 
> at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18)

> 
> at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
> 
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)

> 
> ... 78 more 
> 
> 
> 
> at 
> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162)

> 
> at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103)

> 
> at 
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51)

> 
> at 
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)

> 
> at 
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) 
> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)

> 
> at 
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)

> 
> at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)

> 
> at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)

> 
> at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)

> 
> at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)

> 
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) 
> 
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)

> 
> at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) 
> 
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) 
> 
> at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)

> 
> at java.lang.Thread.run(Thread.java:745) 
> 
> 
> 
> 
> 
> 
> 
> Michael Griffiths 
> NationProtect Developer 
> BAE Systems Applied Intelligence 
> ___________________________________________________________ 
> 
> E: michael.griffiths3@baesystems.com 
> 
> BAE Systems Applied Intelligence, Surrey Research Park, Guildford, 
> Surrey, GU2 7RQ. 
> www.baesystems.com/ai<http://www.baesystems.com/ai> 
> 
> 
> 
> Please consider the environment before printing this email. This 
> message should be regarded as confidential. If you have received this 
> email in error please notify the sender and destroy it immediately. 
> Statements of intent shall only become binding when confirmed in hard 
> copy by an authorised signatory. The contents of this email may relate 
> to dealings with other companies under the control of BAE Systems 
> Applied Intelligence Limited, details of which can be found at 
> http://www.baesystems.com/Businesses/index.htm. 
 		 	   		  
Mime
View raw message