tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bi...@apache.org
Subject git commit: Addendum for client side changes TEZ-607. Client DAG AM communication broken for token based security (bikas)
Date Sat, 09 Nov 2013 17:31:40 GMT
Updated Branches:
  refs/heads/master 8a259dfc3 -> 13fdd0580


Addendum for client side changes TEZ-607. Client DAG AM communication broken for token based
security (bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/13fdd058
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/13fdd058
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/13fdd058

Branch: refs/heads/master
Commit: 13fdd05807cd3d851dac2eae06886232a093c61b
Parents: 8a259df
Author: Bikas Saha <bikas@apache.org>
Authored: Sat Nov 9 09:21:56 2013 -0800
Committer: Bikas Saha <bikas@apache.org>
Committed: Sat Nov 9 09:21:56 2013 -0800

----------------------------------------------------------------------
 .../dag/api/client/rpc/DAGClientRPCImpl.java    | 40 ++++++++++++++++----
 1 file changed, 32 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/13fdd058/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
index 06cebca..07074a2 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientRPCImpl.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.api.client.rpc;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.Set;
 
@@ -27,6 +28,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@@ -34,6 +38,8 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.tez.dag.api.DagTypeConverters;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
@@ -300,14 +306,32 @@ public class DAGClientRPCImpl implements DAGClient {
       // attempt not running
       return false;
     }
-
-    InetSocketAddress addr = new InetSocketAddress(appReport.getHost(),
-        appReport.getRpcPort());
-
-    RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
-        ProtobufRpcEngine.class);
-    proxy = (DAGClientAMProtocolBlockingPB) RPC.getProxy(
-        DAGClientAMProtocolBlockingPB.class, 0, addr, conf);
+    
+    UserGroupInformation newUgi = UserGroupInformation.createRemoteUser(
+        UserGroupInformation.getCurrentUser().getUserName());
+    final InetSocketAddress serviceAddr = NetUtils.createSocketAddrForHost(
+        appReport.getHost(), appReport.getRpcPort());
+    org.apache.hadoop.yarn.api.records.Token clientToAMToken =
+        appReport.getClientToAMToken();
+    if (clientToAMToken != null) {
+      Token<ClientToAMTokenIdentifier> token =
+          ConverterUtils.convertFromYarn(clientToAMToken, serviceAddr);
+      newUgi.addToken(token);
+    }
+    LOG.debug("Connecting to " + serviceAddr);
+    try {
+      proxy = newUgi.doAs(new PrivilegedExceptionAction<DAGClientAMProtocolBlockingPB>()
{
+        @Override
+        public DAGClientAMProtocolBlockingPB run() throws IOException {
+          RPC.setProtocolEngine(conf, DAGClientAMProtocolBlockingPB.class,
+              ProtobufRpcEngine.class);
+          return (DAGClientAMProtocolBlockingPB) RPC.getProxy(
+              DAGClientAMProtocolBlockingPB.class, 0, serviceAddr, conf);
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new TezException(e);
+    }
     return true;
   }
 


Mime
View raw message