flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ken Krugler <kkrugler_li...@transpac.com>
Subject Re: StreamingFileSink in version 1.8
Date Tue, 11 Jun 2019 13:48:31 GMT
The code in HadoopRecoverableWriter is:

		if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || !HadoopUtils.isMinHadoopVersion(2, 7)) {
			throw new UnsupportedOperationException(
					"Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7
or newer");
		}

So one possibility is that your sink path doesn’t have the explicit hdfs://xxx protocol.


Another is that you’re in classpath hell, and your job jar contains an older version of
Hadoop jars.

— Ken


> On Jun 11, 2019, at 12:16 AM, Yitzchak Lieberman <yitzchakl@sentinelone.com> wrote:
> 
> Hi.
> 
> I'm a bit confused:
> When launching my flink streaming application on EMR release 5.24 (which have flink 1.8
version) that write Kafka messages to s3 parquet files i'm getting the exception below, but
when i'm installing flink 1.8 on EMR custom wise it works.
> What could be the difference behavior?
> 
> Thanks,
> Yitzchak.
> 
> Caused by: java.lang.UnsupportedOperationException: Recoverable writers on Hadoop are
only supported for HDFS and for Hadoop version 2.7 or newer
> 	at org.apache.flink.runtime.fs.hdfs.HadoopRecoverableWriter.<init>(HadoopRecoverableWriter.java:57)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.createRecoverableWriter(HadoopFileSystem.java:202)
> 	at org.apache.flink.core.fs.SafetyNetWrapperFileSystem.createRecoverableWriter(SafetyNetWrapperFileSystem.java:69)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.<init>(Buckets.java:112)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink$BulkFormatBuilder.createBuckets(StreamingFileSink.java:317)
> 	at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:327)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
> 	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:278)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:738)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:289)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> 	at java.lang.Thread.run(Thread.java:748)

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra


Mime
View raw message