apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sandesh Hegde <sand...@datatorrent.com>
Subject Re: AvroToPojo Operator doesn't recover after failure and keeps throwing Kryo exception
Date Wed, 31 May 2017 04:29:38 GMT
AvroToPojo has a bug, transient modifier needs to be added to 2 fields.

private transient List<FieldInfo> fieldInfos;
private transient List<ActiveFieldInfo> columnFieldSetters;

Also there is one more bug in Avro input operator, there is a PR open for
that.
Fix is to add the below line to beginWindow call in the operator.
super.beginWindow(windowId);

In both the cases, you can copy the operator code to your repo,  and make
the changes mentioned.

Thanks



On Tue, May 30, 2017 at 6:42 PM Vivek Bhide <bhide.vivek@gmail.com> wrote:

> I am using the AvroToPojo Malhar operator in conjunction with
> AvroFileInputOperator for converting the avro records to POJO. While doing
> the testing for application's stability, I found that AvroToPojo opwerator
> doesn't recover in case of failure and keeps throwing below exception. This
> in turn makes the whole application unstable and hence to be killed
>
> The field for which it throws error 'ActiveFieldInfo' is a static inner
> class and I am not sure on what can be done to have the operator recover
> itself without any trouble.
>
> Any pointers on this issue will be really helpful
>
> 2017-05-30 17:15:46,826 INFO  stram.StreamingContainerParent
> (StreamingContainerParent.java:log(170)) - child msg: deploy request
> failed:
>
> [OperatorDeployInfo[id=2,name=fileReader$avroToPojo,type=GENERIC,checkpoint={592dee04000000b3,
> 0,
>
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=data,streamId=fileReader$avrotopojostream,sourceNodeId=1,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=convertAuditRecordToPojo,bufferServer=
> brdn2244.target.com]]],
>
> OperatorDeployInfo[id=1,name=fileReader$fileReader,type=INPUT,checkpoint={592dee04000000b3,
> 0,
>
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=fileReader$avrotopojostream,bufferServer=<null>]]]]
> com.esotericsoftware.kryo.KryoException: Class cannot be created (missing
> no-arg constructor):
> com.datatorrent.contrib.avro.AvroToPojo$ActiveFieldInfo
> Serialization trace:
> columnFieldSetters (com.datatorrent.contrib.avro.AvroToPojo)
>         at
>
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>         at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>         at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
>
> com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:200)
>         at
> com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:139)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:935)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:883)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:827)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:708)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:313)
>  context:
>
> PTContainer[id=1(container_e21_1491404336779_1770158_01_000018),state=ACTIVE,operators=[PTOperator[id=2,name=fileReader$avroToPojo,state=PENDING_DEPLOY],
> PTOperator[id=1,name=fileReader$fileReader,state=PENDING_DEPLOY]]]
> 2017-05-30 17:15:46,832 INFO  stram.StreamingContainerParent
> (StreamingContainerParent.java:log(170)) - child msg:
> java.lang.IllegalStateException: Deploy request failed:
>
> [OperatorDeployInfo[id=2,name=fileReader$avroToPojo,type=GENERIC,checkpoint={592dee04000000b3,
> 0,
>
> 0},inputs=[OperatorDeployInfo.InputDeployInfo[portName=data,streamId=fileReader$avrotopojostream,sourceNodeId=1,sourcePortName=output,locality=CONTAINER_LOCAL,partitionMask=0,partitionKeys=<null>]],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=convertAuditRecordToPojo,bufferServer=
> brdn2244.target.com]]],
>
> OperatorDeployInfo[id=1,name=fileReader$fileReader,type=INPUT,checkpoint={592dee04000000b3,
> 0,
>
> 0},inputs=[],outputs=[OperatorDeployInfo.OutputDeployInfo[portName=output,streamId=fileReader$avrotopojostream,bufferServer=<null>]]]]
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:836)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.heartbeatLoop(StreamingContainer.java:708)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.main(StreamingContainer.java:313)
> Caused by: com.esotericsoftware.kryo.KryoException: Class cannot be created
> (missing no-arg constructor):
> com.datatorrent.contrib.avro.AvroToPojo$ActiveFieldInfo
> Serialization trace:
> columnFieldSetters (com.datatorrent.contrib.avro.AvroToPojo)
>         at
>
> com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java:1228)
>         at com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1049)
>         at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1058)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.create(FieldSerializer.java:547)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:523)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
>         at
>
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>         at
>
> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
>         at
>
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>         at
>
> com.datatorrent.common.util.FSStorageAgent.retrieve(FSStorageAgent.java:200)
>         at
> com.datatorrent.common.util.FSStorageAgent.load(FSStorageAgent.java:139)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.deployNodes(StreamingContainer.java:935)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.deploy(StreamingContainer.java:883)
>         at
>
> com.datatorrent.stram.engine.StreamingContainer.processHeartbeatResponse(StreamingContainer.java:827)
>         ... 2 more
>
>
> Regards
> Vivek
>
>
>
> --
> View this message in context:
> http://apache-apex-users-list.78494.x6.nabble.com/AvroToPojo-Operator-doesn-t-recover-after-failure-and-keeps-throwing-Kryo-exception-tp1660.html
> Sent from the Apache Apex Users list mailing list archive at Nabble.com.
>

Mime
View raw message