flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kyle Hamlin <hamlin...@gmail.com>
Subject Re: BucketingSink doesn't work anymore moving from 1.3.2 to 1.4.0
Date Fri, 05 Jan 2018 19:50:14 GMT
Also, I'm not using hdfs I'm trying to sink to s3.

On Fri, Jan 5, 2018 at 6:18 PM Kyle Hamlin <hamlin.kn@gmail.com> wrote:

> I have the hadoop-common.jar in my build.sbt because I was having issues
> compiling my jar after moving from 1.3.2 to 1.4.0 because
> org.apache.hadoop.fs.{FileSystem, Path} were no longer in Flink and I use
> them in my custom bucketer and to writer to write Avro out to Parquet.
>
> I tried adding classloader.resolve-order: parent-first to my
> flink-conf.yaml but that didn't seem to work. I greped my jar for "hadoop"
> and found the following:
>
> org/apache/hadoop/*
> org/apache/parquet/hadoop/*
>
> after designating hadoop-common.jar dependency as "provided" only org/apache/parquet/hadoop/*
> files show up. Additionally, the "RpcEngine" and "ProtobufRpcEngine" error
> doesn't show up anymore just the following:
>
> java.lang.RuntimeException: Error while creating FileSystem when
> initializing the state of the BucketingSink.
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> at
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> at
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: Cannot instantiate file system for URI:
> hdfs://localhost:12345/
> at
> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
> at
> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
> at
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
> ... 9 more
>
> Moving the hadoop-common.jar to flinks lib/ directory also doesn't appear
> to help.
>
>
> On Thu, Jan 4, 2018 at 10:48 AM Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> I think this might be happening because partial Hadoop dependencies are
>> in the user jar and the rest is only available from the Hadoop deps that
>> come bundled with Flink. For example, I noticed that you have Hadoop-common
>> as a dependency which probably ends up in your Jar.
>>
>>
>> On 4. Jan 2018, at 11:40, Stephan Ewen <sewen@apache.org> wrote:
>>
>> Hi!
>>
>> This looks indeed like a class-loading issue - it looks like "RpcEngine"
>> and "ProtobufRpcEngine" are loaded via different classloaders.
>>
>> Can you try the following:
>>
>>   - In your flink-conf.yml, set classloader.resolve-order: parent-first
>>
>> If that fixes the issue, then we can look at a way to make this
>> seamless...
>>
>> On Wed, Jan 3, 2018 at 8:31 PM, Kyle Hamlin <hamlin.kn@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> After moving to Flink 1.4.0 I'm getting the following error. I can't
>>> find anything online that addresses it. Is it a Hadoop dependency issue?
>>> Here are my project dependencies:
>>>
>>> libraryDependencies ++= Seq(
>>>   "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
>>>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
>>>   "org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion,
>>>   "org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
>>>   "org.apache.flink" % "flink-metrics-core" % flinkVersion,
>>>   "org.apache.flink" % "flink-metrics-graphite" % flinkVersion,
>>>   "org.apache.kafka" %% "kafka" % "0.10.0.1",
>>>   "org.apache.avro" % "avro" % "1.7.7",
>>>   "org.apache.parquet" % "parquet-hadoop" % "1.8.1",
>>>   "org.apache.parquet" % "parquet-avro" % "1.8.1",
>>>   "io.confluent" % "kafka-avro-serializer" % "3.2.0",
>>>   "org.apache.hadoop" % "hadoop-common" % "3.0.0"
>>> )
>>>
>>> *Stacktrace:*
>>> Cluster configuration: Standalone cluster with JobManager at localhost/
>>> 127.0.0.1:6123
>>> Using address localhost:6123 to connect to JobManager.
>>> JobManager web interface address http://localhost:8082
>>> Starting execution of program
>>> Submitting job with JobID: b6ed965410dad61f96f8dec73b614a9f. Waiting for
>>> job completion.
>>> Connected to JobManager at Actor[akka.tcp://flink@localhost:6123/user/jobmanager#-1321297259]
>>> with leader session id 00000000-0000-0000-0000-000000000000.
>>> 01/03/2018 14:20:52 Job execution switched to status RUNNING.
>>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to SCHEDULED
>>> 01/03/2018 14:20:52 Source: Kafka -> Sink: S3(1/1) switched to DEPLOYING
>>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to RUNNING
>>> 01/03/2018 14:20:53 Source: Kafka -> Sink: S3(1/1) switched to FAILED
>>> java.lang.RuntimeException: Error while creating FileSystem when
>>> initializing the state of the BucketingSink.
>>> at
>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>>> at
>>> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.io.IOException: Cannot instantiate file system for URI:
>>> hdfs://localhost:12345/
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:187)
>>> at
>>> org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:401)
>>> at
>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1154)
>>> at
>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
>>> at
>>> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
>>> ... 9 more
>>> Caused by: java.lang.ClassCastException:
>>> org.apache.hadoop.ipc.ProtobufRpcEngine cannot be cast to
>>> org.apache.hadoop.ipc.RpcEngine
>>> at org.apache.hadoop.ipc.RPC.getProtocolEngine(RPC.java:207)
>>> at org.apache.hadoop.ipc.RPC.getProtocolProxy(RPC.java:579)
>>> at
>>> org.apache.hadoop.hdfs.NameNodeProxies.createNNProxyWithClientProtocol(NameNodeProxies.java:418)
>>> at
>>> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:314)
>>> at
>>> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
>>> at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
>>> at
>>> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopFsFactory.create(HadoopFsFactory.java:159)
>>> ... 13 more
>>>
>>
>>
>>

Mime
View raw message