flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0
Date Thu, 04 Jan 2018 10:46:51 GMT
@Kyle:

Please also check if you have any Hadoop classes in your user jar. There
should be none, Hadoop should only be in the Flink classpath.
Fixing the project Maven setup (making sure Hadoop and Flink core
dependencies are provided) should work.

To do that, you can for example use the latest quickstart template from
Flink 1.4

On Thu, Jan 4, 2018 at 11:40 AM, Stephan Ewen <sewen@apache.org> wrote:

> Hi!
>
> This looks indeed like a class-loading issue - it looks like "RpcEngine"
> and "ProtobufRpcEngine" are loaded via different classloaders.
>
> Can you try the following:
>
>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
>
> If that fixes the issue, then we can look at a way to make this seamless...
>
> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <hamlin.kn@gmail.com> wrote:
>
>> Hello,
>>
>> After moving to Flink 1.4.0 I'm getting the following error. I can't find
>> anything online that addresses it. Is it a Hadoop dependency issue? Here
>> are my project dependencies:
>>
>> libraryDependencies ++= Seq(
>>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>>   "org.apache.avro" % "avro" % "1.7.7",
>>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
>> )
>>
>> *Stacktrace:*
>> Cluster configuration: Standalone cluster with JobManager at localhost/
>> 127.0.0.1:6123
>> Using address localhost:6123 to connect to JobManager.
>> JobManager web interface address http://localhost:8082
>> Starting execution of program
>> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
>> job completion.
>> Connected to JobManager at Actor[akka.tcp://flink@localho
>> st:6123/user/jobmanager#-1321297259] with leader session id
>> 00000000-0000-0000-0000-000000000000.
>> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
>> java.lang.RuntimeException: Error while creating FileSystem when
>> initializing the state of the BucketingSink.
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.initializeState(BucketingSink.java:358)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionU
>> tils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>> at org.apache.flink.streaming.util.functions.StreamingFunctionU
>> tils.restoreFunctionState(StreamingFunctionUtils.java:160)
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>> erator.initializeState(AbstractUdfStreamOperator.java:96)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOpera
>> tor.initializeState(AbstractStreamOperator.java:259)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
>> zeOperators(StreamTask.java:694)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.initiali
>> zeState(StreamTask.java:682)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>> treamTask.java:253)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>> hdfs://localhost:12345/
>> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
>> opFsFactory.java:187)
>> at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(F
>> ileSystem.java:401)
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.createHadoopFileSystem(BucketingSink.java:1154)
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.initFileSystem(BucketingSink.java:411)
>> at org.apache.flink.streaming.connectors.fs.bucketing.Bucketing
>> Sink.initializeState(BucketingSink.java:355)
>> ... 9 more
>> Caused by: java.lang.ClassCastException: org.apache.hadoop.ipc.ProtobufRpcEngine
>> cannot be cast to org.apache.hadoop.ipc.RpcEngine
>> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
>> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClie
>> ntProtocol(NameNodeProxies.java:418)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(Name
>> NodeProxies.java:314)
>> at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeP
>> roxies.java:176)
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>> at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(Dist
>> ributedFileSystem.java:149)
>> at org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(Hado
>> opFsFactory.java:159)
>> ... 13 more
>>
>
>

Mime
View raw message