flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3418) RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security configuration
Date Mon, 22 Feb 2016 14:12:18 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15157029#comment-15157029
] 

ASF GitHub Bot commented on FLINK-3418:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1687#discussion_r53629014
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
---
    @@ -26,32 +25,46 @@
     import java.io.File;
     import java.io.FileInputStream;
     import java.net.URI;
    +import java.util.ArrayList;
    +import java.util.List;
     
     /**
    - * Utility for copying from local file system to a HDFS {@link FileSystem} in an external
process.
    - * This is required since {@code FileSystem.copyFromLocalFile} does not like being interrupted.
    + * Utility for copying from local file system to a HDFS {@link FileSystem}.
      */
     public class HDFSCopyFromLocal {
    -	public static void main(String[] args) throws Exception {
    -		String hadoopConfPath = args[0];
    -		String localBackupPath = args[1];
    -		String backupUri = args[2];
    -
    -		Configuration hadoopConf = new Configuration();
    -		try (DataInputStream in = new DataInputStream(new FileInputStream(hadoopConfPath)))
{
    -			hadoopConf.readFields(in);
    -		}
     
    -		FileSystem fs = FileSystem.get(new URI(backupUri), hadoopConf);
    +	public static void copyFromLocal(final File hadoopConfPath, final File localPath, final
URI remotePath) throws Exception {
    +		// Do it in another Thread because HDFS can deadlock if being interrupted while copying
     
    -		fs.copyFromLocalFile(new Path(localBackupPath), new Path(backupUri));
    -	}
    +		String threadName = "HDFS Copy from " + localPath + " to " + remotePath;
    +
    +		final List<Exception> asyncException = new ArrayList<>();
    +
    +		Thread copyThread = new Thread(threadName) {
    +			@Override
    +			public void run() {
    +				try {
    +					Configuration hadoopConf = new Configuration();
    --- End diff --
    
    Ah dammit, I pushed to wrong stuff. Give me a sec.


> RocksDB HDFSCopyFromLocal util doesn't respect our Hadoop security configuration
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-3418
>                 URL: https://issues.apache.org/jira/browse/FLINK-3418
>             Project: Flink
>          Issue Type: Bug
>          Components: state backends
>            Reporter: Robert Metzger
>            Assignee: Aljoscha Krettek
>            Priority: Blocker
>
> As you can see for example in the {{YARNTaskManagerRunner}}, our TaskManagers are running
in a special UserGroupInformation.doAs(); call. 
> With that call, we are manually changing the user from the user starting the YARN NodeManager
(our containers are part of that process tree) to the user who submitted the job.
> For example on my cluster, the NodeManager runs as "yarn", but "robert" submits the job.
For regular file access, "robert" is accessing the files in HDFS, even though "yarn" runs
the process.
> The {{HDFSCopyFromLocal}} does not properly initialize these settings, hence "yarn" tries
to access the files, leading to the following exception:
> {code}
> Caused by: java.lang.RuntimeException: Error while copying to remote FileSystem: SLF4J:
Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in [jar:file:/yarn/nm/usercache/robert/appcache/application_1455632128025_0010/filecache/17/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.5-1.cdh5.4.5.p0.7/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> Exception in thread "main" org.apache.hadoop.security.AccessControlException: Permission
denied: user=yarn, access=WRITE, inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
> 	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
> 	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
> 	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
> 	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
> 	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
> 	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:415)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> 	at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
> 	at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
> 	at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2755)
> 	at org.apache.hadoop.hdfs.DFSClient.mkdirs(DFSClient.java:2724)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:870)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:866)
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirsInternal(DistributedFileSystem.java:866)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.mkdirs(DistributedFileSystem.java:859)
> 	at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:1817)
> 	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:351)
> 	at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:338)
> 	at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1905)
> 	at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1873)
> 	at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1838)
> 	at org.apache.flink.contrib.streaming.state.HDFSCopyFromLocal.main(HDFSCopyFromLocal.java:47)
> Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException):
Permission denied: user=yarn, access=WRITE, inode="/user/robert/rocksdb/5b7ad8b04048e894ef7bf341856681bf":robert:supergroup:drwxr-xr-x
> 	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkFsPermission(DefaultAuthorizationProvider.java:257)
> 	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:238)
> 	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.check(DefaultAuthorizationProvider.java:216)
> 	at org.apache.hadoop.hdfs.server.namenode.DefaultAuthorizationProvider.checkPermission(DefaultAuthorizationProvider.java:145)
> 	at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:138)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6599)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkPermission(FSNamesystem.java:6581)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkAncestorAccess(FSNamesystem.java:6533)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInternal(FSNamesystem.java:4337)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirsInt(FSNamesystem.java:4307)
> 	at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.mkdirs(FSNamesystem.java:4280)
> 	at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.mkdirs(NameNodeRpcServer.java:853)
> 	at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.mkdirs(AuthorizationProviderProxyClientProtocol.java:321)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.mkdirs(ClientNamenodeProtocolServerSideTranslatorPB.java:601)
> 	at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
> 	at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044)
> 	at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at javax.security.auth.Subject.doAs(Subject.java:415)
> 	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
> 	at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1468)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1399)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
> 	at com.sun.proxy.$Proxy9.mkdirs(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.mkdirs(ClientNamenodeProtocolTranslatorPB.java:539)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
> 	at com.sun.proxy.$Proxy10.mkdirs(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.primitiveMkdir(DFSClient.java:2753)
> 	... 13 more
> 	at org.apache.flink.contrib.streaming.state.HDFSCopyFromLocal.copyFromLocal(HDFSCopyFromLocal.java:54)
> 	at org.apache.flink.contrib.streaming.state.AbstractRocksDBState$AsyncRocksDBSnapshot.materialize(AbstractRocksDBState.java:454)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$1.run(StreamTask.java:531)
> {code}
> I think we need to fix this before the 1.0.0 release.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message