flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: AWS exception serialization problem
Date Tue, 07 Mar 2017 16:50:24 GMT
Hi,

I just had a quick look on this, but the Kafka fetcher thread’s context classloader doesn’t
seem to be the issue (at least for 1.1.4).

In Flink 1.1.4, a separate thread from the task thread is created to run the fetcher, but
since the task thread sets the user code classloader as its context classloader, shouldn’t
any threads created from it (i.e., the fetcher thread) use it also?

A quickly checked the context classloader the Kafka09Fetcher thread in 1.1.4 was using, and
it’s `FlinkUserCodeClassLoader`.


On March 7, 2017 at 7:32:35 PM, Stephan Ewen (sewen@apache.org) wrote:

Ah, I see...

The issue is that the Kafka fetcher thread apparently do not have the user-code class loader
set as the context class loader. Kryo relies on that for class resolution.

What Flink version are you on? I think that actual processing and forwarding does not happen
in the Kafka Fetchers any more as of 1.2, so only Flink 1.1 should be affected...


On Tue, Mar 7, 2017 at 2:43 AM, Shannon Carey <scarey@expedia.com> wrote:
I think my previous guess was wrong. From what I can tell, when Kryo tries to copy the exception
object, it does that by serializing and deserializing it. For subclasses of RuntimeException,
it doesn't know how to do it so it delegates serialization to Java. However, it doesn't use
a custom ObjectInputStream to override resolveClass() and provide classes from the user
code classloader… such as happens in RocksDBStateBackend's use of InstantiationUtil.deserializeObject().
Instead, it uses ObjectInputStream$latestUserDefinedLoader() which is the Launcher$AppClassLoader
which definitely doesn't have the user code in it.

Seems like a bug in TrySerializer#copy? Or somewhere that Kryo is being configured?

Thanks,
Shannon


From: Shannon Carey <scarey@expedia.com>
Date: Monday, March 6, 2017 at 7:09 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: AWS exception serialization problem

This happened when running Flink with bin/run-local.sh I notice that there only appears to
be one Java process. The job manager and the task manager run in the same JVM, right? I notice,
however, that there are two blob store folders on disk. Could the problem be caused by two
different FlinkUserCodeClassLoader objects pointing to the two different JARs?


From: Shannon Carey <scarey@expedia.com>
Date: Monday, March 6, 2017 at 6:39 PM
To: "user@flink.apache.org" <user@flink.apache.org>
Subject: AWS exception serialization problem

Has anyone encountered this or know what might be causing it?


java.lang.RuntimeException: Could not forward element to next operator
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:394)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:376)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:366)
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:349)
        at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:355)
        at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:225)
        at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:253)
        at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
        at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:47)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
        at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:172)
        at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:51)
        at org.apache.flink.api.scala.typeutils.TrySerializer.copy(TrySerializer.scala:32)
        at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:389)
        ... 7 more
Caused by: java.lang.ClassNotFoundException: com.amazonaws.services.s3.model.AmazonS3Exception
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:348)
        at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
        at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:552)
        at java.lang.Throwable.readObject(Throwable.java:914)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
        at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:45)
        ... 12 more


Mime
View raw message