flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kyle Hamlin <hamlin...@gmail.com>
Subject Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0
Date Fri, 05 Jan 2018 18:18:17 GMT
I have the hadoop-common.jar in my build.sbt because I was having issues
compiling my jar after moving from 1.3.2 to 1.4.0 because
org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use
them in my custom bucketer and to writer to write Avro out to Parquet.

I tried adding classloader.resolve-order: parent-first to my
flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop"
and found the following:

org/apache/hadoop/*
org/apache/parquet/hadoop/*

after designating hadoop-common.jar dependency as "provided" only
org/apache/parquet/hadoop/*
files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error
doesn't show up anymore just the following:

java.lang.RuntimeException: Error while creating FileSystem when
initializing the state of the BucketingSink.
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Cannot instantiate file system for URI:
hdfs://localhost:12345/
at
org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
at
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
... 9 more

Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear
to help.

On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <aljoscha@apache.org>
wrote:

> I think this might be happening because partial Hadoop dependencies are in
> the user jar and the rest is only available from the Hadoop deps that come
> bundled with Flink. For example, I noticed that you have Hadoop-common as a
> dependency which probably ends up in your Jar.
>
>
> On 4. Jan 2018, at 11:40, Stephan Ewen <sewen@apache.org> wrote:
>
> Hi!
>
> This looks indeed like a class-loading issue - it looks like "RpcEngine"
> and "ProtobufRpcEngine" are loaded via different classloaders.
>
> Can you try the following:
>
>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
>
> If that fixes the issue, then we can look at a way to make this seamless...
>
> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <hamlin.kn@gmail.com> wrote:
>
>> Hello,
>>
>> After moving to Flink 1.4.0 I'm getting the following error. I can't find
>> anything online that addresses it. Is it a Hadoop dependency issue? Here
>> are my project dependencies:
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>>   "org.apache.avro" % "avro" % "1.7.7",
>>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
>> )
>>
>> *Stacktrace:*
>> Cluster configuration: Standalone cluster with JobManager at localhost/
>> 127.0.0.1:6123
>> Using address localhost:6123 to connect to JobManager.
>> JobManager web interface address http://localhost:8082
>> Starting execution of program
>> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
>> job completion.
>> Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
>> with leader session id 00000000-0000-0000-0000-000000000000.
>> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
>> java.lang.RuntimeException: Error while creating FileSystem when
>> initializing the state of the BucketingSink.
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> at
>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>> at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>> hdfs://localhost:12345/
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>> at
>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>> at
>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>> ... 9 more
>> Caused by: java.lang.ClassCastException:
>> org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to
>> org.apache.hadoop.ipc.RpcEngine
>> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
>> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
>> at
>> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
>> at
>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
>> at
>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>> at
>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>> at
>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>> ... 13 more
>>
>
>
>

Mime
View raw message