beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luke Cwik (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2948) Unable to find registrar when restoring flink job from savepoint
Date Thu, 14 Sep 2017 17:40:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16166666#comment-16166666
] 

Luke Cwik commented on BEAM-2948:
---------------------------------

FileSystems assumes that it has been initialized early on within each "worker". If this isn't
honored then we may find other areas where this is also an issue.

Also, once this is executing over the portability framework, this will be a non-issue since
it will all happen in the SDK harness.

> Unable to find registrar when restoring flink job from savepoint
> ----------------------------------------------------------------
>
>                 Key: BEAM-2948
>                 URL: https://issues.apache.org/jira/browse/BEAM-2948
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>         Environment: Flink v1.2.1 job on a EMR cluster using Beam v2.0.0
>            Reporter: Luke Cwik
>            Assignee: Aljoscha Krettek
>            Priority: Minor
>
> Reported: https://lists.apache.org/thread.html/d113707e84d7562e6d0d891830a8d85d76a497435105fe3ed1e06e13@%3Cdev.beam.apache.org%3E
> When I try to restore job from savepoint on one task manager I get the
> exception *Unable to find registrar for s3n*.
> The job can write files to s3 acting as a sink. So S3 access works except
> when restoring from savepoint.
> I am passing the following configuration as the  pipeline
> HadoopFileSystemOptions
> options:
> Configuration configuration = new Configuration();
>             configuration.set("fs.defaultFS", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
>             configuration.set("fs.default.name", String.format("s3n://%s",
> jobOptions.getOutputFileSystemRoot()));
>             configuration.set("fs.s3.impl",
> "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
>             configuration.set("fs.s3n.awsAccessKeyId",
> jobOptions.getAwsAccessKey());
>             configuration.set("fs.s3n.awsSecretAccessKey",
> jobOptions.getAwsSecretKey());
> From my investigation it looks like Beam FileSystems.
> setDefaultPipelineOptions method is not called before
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder relay on initialised
> FileSystems.SCHEME_TO_FILESYSTEM map
> java.lang.IllegalStateException: Could not initialize keyed state backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:293)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:204)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:653)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:640)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:246)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalStateException: Unable to find registrar for s3n
> at
> org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:447)
> at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:523)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1059)
> at
> org.apache.beam.sdk.io.FileBasedSink$FileResultCoder.decode(FileBasedSink.java:1020)
> at
> org.apache.beam.runners.flink.translation.types.CoderTypeSerializer.deserialize(CoderTypeSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:87)
> at
> org.apache.flink.runtime.state.ArrayListSerializer.deserialize(ArrayListSerializer.java:27)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.readStateTableForKeyGroup(HeapKeyedStateBackend.java:370)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restorePartitionedState(HeapKeyedStateBackend.java:340)
> at
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.restore(HeapKeyedStateBackend.java:243)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createKeyedStateBackend(StreamTask.java:788)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initKeyedState(AbstractStreamOperator.java:284)
> ... 6 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message