Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B0E1B200C30 for ; Tue, 7 Mar 2017 17:50:30 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AF6CF160B68; Tue, 7 Mar 2017 16:50:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 857C1160B65 for ; Tue, 7 Mar 2017 17:50:29 +0100 (CET) Received: (qmail 63623 invoked by uid 500); 7 Mar 2017 16:50:28 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 63610 invoked by uid 99); 7 Mar 2017 16:50:28 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Mar 2017 16:50:28 +0000 Received: from MacBook.mail (220-137-193-97.dynamic.hinet.net [220.137.193.97]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 71D231A0015; Tue, 7 Mar 2017 16:50:27 +0000 (UTC) Date: Wed, 8 Mar 2017 00:50:24 +0800 From: "Tzu-Li (Gordon) Tai" To: Stephan Ewen , user@flink.apache.org Message-ID: Subject: Re: AWS exception serialization problem X-Mailer: Airmail (397) MIME-Version: 1.0 Content-Type: multipart/alternative; boundary="58bee4d0_355c6e58_a39" archived-at: Tue, 07 Mar 2017 16:50:30 -0000 --58bee4d0_355c6e58_a39 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline Hi, I just had a quick look on this, but the Kafka fetcher thread=E2=80=99s c= ontext classloader doesn=E2=80=99t seem to be the issue (at least for 1.1= .4). In =46link 1.1.4, a separate thread from the task thread is created to ru= n the fetcher, but since the task thread sets the user code classloader a= s its context classloader, shouldn=E2=80=99t any threads created from it = (i.e., the fetcher thread) use it also=3F A quickly checked the context classloader the Kafka09=46etcher thread in = 1.1.4 was using, and it=E2=80=99s =60=46linkUserCodeClassLoader=60. On March 7, 2017 at 7:32:35 PM, Stephan Ewen (sewen=40apache.org) wrote: Ah, I see... The issue is that the Kafka fetcher thread apparently do not have the use= r-code class loader set as the context class loader. Kryo relies on that = for class resolution. What =46link version are you on=3F I think that actual processing and for= warding does not happen in the Kafka =46etchers any more as of 1.2, so on= ly =46link 1.1 should be affected... On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey wrot= e: I think my previous guess was wrong. =46rom what I can tell, when Kryo tr= ies to copy the exception object, it does that by serializing and deseria= lizing it. =46or subclasses of RuntimeException, it doesn't know how to d= o it so it delegates serialization to Java. However, it doesn't use a cus= tom=C2=A0ObjectInputStream to override=C2=A0resolveClass() and provide cl= asses from the user code classloader=E2=80=A6 such as happens in RocksDBS= tateBackend's use of InstantiationUtil.deserializeObject(). Instead, it u= ses ObjectInputStream=24latestUserDefinedLoader() which is the Launcher=24= AppClassLoader which definitely doesn't have the user code in it. Seems like a bug in TrySerializer=23copy=3F Or somewhere that Kryo is bei= ng configured=3F Thanks, Shannon =46rom: Shannon Carey Date: Monday, March 6, 2017 at 7:09 PM To: =22user=40flink.apache.org=22 Subject: Re: AWS exception serialization problem This happened when running =46link with bin/run-local.sh I notice that th= ere only appears to be one Java process. The job manager and the task man= ager run in the same JVM, right=3F I notice, however, that there are two = blob store folders on disk. Could the problem be caused by two different = =46linkUserCodeClassLoader objects pointing to the two different JARs=3F =46rom: Shannon Carey Date: Monday, March 6, 2017 at 6:39 PM To: =22user=40flink.apache.org=22 Subject: AWS exception serialization problem Has anyone encountered this or know what might be causing it=3F java.lang.RuntimeException: Could not forward element to next operator at org.apache.flink.streaming.runtime.tasks.OperatorChain=24Copyi= ngChainingOutput.collect(OperatorChain.java:394) at org.apache.flink.streaming.runtime.tasks.OperatorChain=24Copyi= ngChainingOutput.collect(OperatorChain.java:376) at org.apache.flink.streaming.api.operators.AbstractStreamOperato= r=24CountingOutput.collect(AbstractStreamOperator.java:366) at org.apache.flink.streaming.api.operators.AbstractStreamOperato= r=24CountingOutput.collect(AbstractStreamOperator.java:349) at org.apache.flink.streaming.api.operators.StreamSource=24Manual= WatermarkContext.collect(StreamSource.java:355) at org.apache.flink.streaming.connectors.kafka.internals.Abstract= =46etcher.emitRecord(Abstract=46etcher.java:225) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09=46= etcher.run(Kafka09=46etcher.java:253) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java des= erialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(Java= Serializer.java:47) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657) at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerialize= r.copy(KryoSerializer.java:172) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySer= ializer.scala:51) at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySer= ializer.scala:32) at org.apache.flink.streaming.runtime.tasks.OperatorChain=24Copyi= ngChainingOutput.collect(OperatorChain.java:389) ... 7 more Caused by: java.lang.ClassNot=46oundException: com.amazonaws.services.s3.= model.AmazonS3Exception at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher=24AppClassLoader.loadClass(Launcher.java:331= ) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:= 677) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.j= ava:1819) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java= :1713) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream= .java:1986) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1= 535) at java.io.ObjectInputStream.defaultRead=46ields(ObjectInputStrea= m.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.= java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccess= orImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMeth= odAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.j= ava:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.jav= a:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream= .java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1= 535) at java.io.ObjectInputStream.defaultRead=46ields(ObjectInputStrea= m.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.= java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccess= orImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMeth= odAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.j= ava:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.jav= a:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream= .java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1= 535) at java.io.ObjectInputStream.defaultRead=46ields(ObjectInputStrea= m.java:2231) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.= java:552) at java.lang.Throwable.readObject(Throwable.java:914) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccess= orImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMeth= odAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.j= ava:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.jav= a:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream= .java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1= 535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:42= 2) at com.esotericsoftware.kryo.serializers.JavaSerializer.read(Java= Serializer.java:45) ... 12 more --58bee4d0_355c6e58_a39 Content-Type: text/html; charset="utf-8" Content-Transfer-Encoding: quoted-printable Content-Disposition: inline
Hi,

I just had a quick look on this, but the Kafka fetcher thread=E2=80=99s context classloader doesn=E2=80=99t seem to be the issue (at least for 1.1.4).

In =46link 1.1.4, a separate thread from the task thread is created to run the fetcher, but since the task thread sets the user code classloader as its context classloader, shouldn=E2=80=99t any threads cre= ated from it (i.e., the fetcher thread) use it also=3F

A quickly checked the context classloader the Kafka09=46etcher thread in 1.1.4 was using, and it=E2=80=99s =60=46linkUserCodeClassLoader=60.


On March 7, 2017 at 7:32:35 PM, Stephan Ewe= n (sewen=40apache.org) wrote:=

Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the user-code class loader set as the context class loader. Kryo relies on that for class resolution.

What =46link version are you on=3F I think that actual processing and forwarding does not happen in the Kafka =46etchers any more as of 1.2, so only =46link 1.1 should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <scarey=40expedia.com> wrote:
I think my previous guess was wrong. =46rom what I can tell, when Kryo tries to copy the exception object, it does that by serializing and deserializing it. =46or subclasses of RuntimeException, it doesn't know how to do it so it delegates serialization to Java. However, it doesn't use a custom ObjectInputStream to override resolveClass() and provide classes from the user code classloader=E2=80=A6 such as happens i= n RocksDBStateBackend's use of InstantiationUtil.deserializeObject(). Instead, it uses ObjectInputStream=24latestUserDefinedLoader() which is the Launcher=24AppClassLoader which definitely doesn't have the user code in it.

Seems like a bug in TrySerializer=23copy=3F Or somewhere that Kryo is being configured=3F

Thanks,
Shannon


=46rom: Shannon Carey <scarey=40expedia.com>
Date: Monday, March 6, 2017 at 7:09 PM
To: =22user=40flink.apache.org=22 <user=40flink.apache.org>
Subject: Re: AWS exception serialization problem

This happened when running =46link with bin/run-local.sh I notice that there only appears to be one Java process. The job manager and the task manager run in the same JVM, right=3F I notice, however, that there are two blob store folders on disk. Could the problem be caused by two different =46linkUserCodeClassLoader objects pointing to the two different JARs=3F


=46rom: Shannon Carey <scarey=40expedia.com>
Date: Monday, March 6, 2017 at 6:39 PM
To: =22user=40flink.apache.org=22 <user=40flink.apache.org>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it=3F


java.lang.RuntimeException: Could not forward element to next opera=
tor
        at org.apache.flink.streaming.runtime.tasks.OperatorChain=24=
CopyingChainingOutput.collect(OperatorChain.java:394)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain=24=
CopyingChainingOutput.collect(OperatorChain.java:376)
        at org.apache.flink.streaming.api.operators.AbstractStr=
eamOperator=24CountingOutput.collect(AbstractStreamOperator.jav=
a:366)
        at org.apache.flink.streaming.api.operators.AbstractStr=
eamOperator=24CountingOutput.collect(AbstractStreamOperator.jav=
a:349)
        at org.apache.flink.streaming.api.operators.StreamSource=24<=
wbr>ManualWatermarkContext.collect(StreamSource.java:355)
        at org.apache.flink.streaming.connectors.kafka.internals.Abstract=46etcher.emitRecord(Abstract=46etcher.java:225)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09=46etcher.run(Kafka09=46etcher.java:253)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Jav=
a deserialization.
        at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at org.apache.flink.api.java.typeutils.runtime.kryo.Kry=
oSerializer.copy(KryoSerializer.java:172)
        at org.apache.flink.api.scala.typeutils.TrySerializer.copy(<=
wbr>TrySerializer.scala:51)
        at org.apache.flink.api.scala.typeutils.TrySerializer.copy(<=
wbr>TrySerializer.scala:32)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain=24=
CopyingChainingOutput.collect(OperatorChain.java:389)
        ... 7 more
Caused by: java.lang.ClassNot=46oundException: com.amazonaws.service=
s.s3.model.AmazonS3Exception
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424=
)
        at sun.misc.Launcher=24AppClassLoader.loadClass(Launche=
r.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357=
)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at java.io.ObjectInputStream.resolveClass(ObjectInputSt=
ream.java:677)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInp=
utStream.java:1819)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputS=
tream.java:1713)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectI=
nputStream.java:1986)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.<=
wbr>java:1535)
        at java.io.ObjectInputStream.defaultRead=46ields(Object=
InputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectIn=
putStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native =
Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Nat=
iveMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStr=
eamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInput=
Stream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectI=
nputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.<=
wbr>java:1535)
        at java.io.ObjectInputStream.defaultRead=46ields(Object=
InputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectIn=
putStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native =
Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Nat=
iveMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStr=
eamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInput=
Stream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectI=
nputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.<=
wbr>java:1535)
        at java.io.ObjectInputStream.defaultRead=46ields(Object=
InputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectIn=
putStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native =
Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(Nat=
iveMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStr=
eamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInput=
Stream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectI=
nputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.<=
wbr>java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
        ... 12 more

--58bee4d0_355c6e58_a39--