flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0
Date Thu, 04 Jan 2018 10:48:21 GMT
I think this might be happening because partial Hadoop dependencies are in the user jar and
the rest is only available from the Hadoop deps that come bundled with Flink. For example,
I noticed that you have Hadoop-common as a dependency which probably ends up in your Jar.

> On 4. Jan 2018, at 11:40, Stephan Ewen <sewen@apache.org> wrote:
> 
> Hi!
> 
> This looks indeed like a class-loading issue - it looks like "RpcEngine" and "ProtobufRpcEngine"
are loaded via different classloaders.
> 
> Can you try the following:
> 
>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
> 
> If that fixes the issue, then we can look at a way to make this seamless...
> 
> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <hamlin.kn@gmail.com <mailto:hamlin.kn@gmail.com>>
wrote:
> Hello,
> 
> After moving to Flink 1.4.0 I'm getting the following error. I can't find anything online
that addresses it. Is it a Hadoop dependency issue? Here are my project dependencies: 
> 
> libraryDependencies ++= Seq(
>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>   "org.apache.avro" % "avro" % "1.7.7",
>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
> )
> Stacktrace:
> Cluster configuration: Standalone cluster with JobManager at localhost/127.0.0.1:6123
<http://127.0.0.1:6123/>
> Using address localhost:6123 to connect to JobManager.
> JobManager web interface address http://localhost:8082 <http://localhost:8082/>
> Starting execution of program
> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for job completion.
> Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
with leader session id 00000000-0000-0000-0000-000000000000.
> 01/03/2018 14:20:52	Job execution switched to status RUNNING.
> 01/03/2018 14:20:52	Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
> 01/03/2018 14:20:52	Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
> 01/03/2018 14:20:53	Source: Kafka -> Sink: S3(1/1) switched to RUNNING
> 01/03/2018 14:20:53	Source: Kafka -> Sink: S3(1/1) switched to FAILED
> java.lang.RuntimeException: Error while creating FileSystem when initializing the state
of the BucketingSink.
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot instantiate file system for URI: hdfs://localhost:12345/
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
> 	at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
> 	at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
> 	... 9 more
> Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine cannot
be cast to org.apache.hadoop.ipc.RpcEngine
> 	at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
> 	at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
> 	at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
> 	at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
> 	at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> 	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
> 	at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
> 	... 13 more
> 


Mime
View raw message