hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r905875 - in /hadoop/mapreduce/trunk: ./ .eclipse.templates/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/protocol/ src/java/org/apache/hadoop/mapreduce/security/ src/java/org/apache/hadoop/mapreduce/security/toke...
Date Wed, 03 Feb 2010 03:06:56 GMT
Author: ddas
Date: Wed Feb  3 03:06:52 2010
New Revision: 905875

URL: http://svn.apache.org/viewvc?rev=905875&view=rev
Log:
MAPREDUCE-1335. Adds SASL Kerberos/Digest authentication in MapReduce. Contributed by Kan
Zhang.

Added:
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
Modified:
    hadoop/mapreduce/trunk/.eclipse.templates/.classpath
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java

Modified: hadoop/mapreduce/trunk/.eclipse.templates/.classpath
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/.eclipse.templates/.classpath?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/.eclipse.templates/.classpath (original)
+++ hadoop/mapreduce/trunk/.eclipse.templates/.classpath Wed Feb  3 03:06:52 2010
@@ -57,6 +57,7 @@
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/slf4j-api-1.5.8.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/slf4j-simple-1.5.8.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/Hadoop/common/xmlenc-0.52.jar"/>
+        <classpathentry kind="lib" path="build/ivy/lib/Hadoop/test/mockito-all-1.8.0.jar"/>
 	<classpathentry kind="lib" path="build/ivy/lib/index/common/lucene-core-2.3.1.jar"/>
 	<classpathentry kind="lib" path="build/test/classes"/>
 	<classpathentry kind="lib" path="build/classes"/>

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Feb  3 03:06:52 2010
@@ -46,6 +46,9 @@
     for loading the tokens in the user's ugi. This is required
     for the copying of files from the hdfs. (ddas)
 
+    MAPREDUCE-1335. Adds SASL Kerberos/Digest authentication in MapReduce.
+    (Kan Zhang via ddas)
+
   IMPROVEMENTS
 
     MAPREDUCE-1198. Alternatively schedule different types of tasks in

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/AdminOperationsProtocol.java
Wed Feb  3 03:06:52 2010
@@ -21,11 +21,14 @@
 import java.io.IOException;
 
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.KerberosInfo;
 
 /**
  * Protocol for admin operations. This is a framework-public interface and is
  * NOT_TO_BE_USED_BY_USERS_DIRECTLY.
  */
+@KerberosInfo(JobContext.JOB_JOBTRACKER_ID)
 public interface AdminOperationsProtocol extends VersionedProtocol {
   
   /**

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/Child.java Wed Feb  3 03:06:52
2010
@@ -28,18 +28,19 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.jvm.JvmMetrics;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.LogManager;
@@ -79,6 +80,11 @@
       TokenCache.loadTaskTokenStorage(jobTokenFile, defaultConf);
     LOG.debug("loading token. # keys =" +ts.numberOfSecretKeys() + 
         "; from file=" + jobTokenFile);
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
+    jt.setService(new Text(address.getAddress().getHostAddress() + ":"
+        + address.getPort()));
+    UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    current.addToken(jt);
     
     TaskUmbilicalProtocol umbilical =
       (TaskUmbilicalProtocol)RPC.getProxy(TaskUmbilicalProtocol.class,
@@ -157,8 +163,6 @@
         TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
         
-        // set job shuffle token
-        Token<? extends TokenIdentifier> jt = TokenCache.getJobToken(ts);
         // set the jobTokenFile into task
         task.setJobTokenSecret(JobTokenSecretManager.
                                createSecretKey(jt.getPassword()));

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Wed
Feb  3 03:06:52 2010
@@ -21,11 +21,14 @@
 import java.io.IOException;
 
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.security.KerberosInfo;
 
 /** 
  * Protocol that a TaskTracker and the central JobTracker use to communicate.
  * The JobTracker is the Server, which implements this protocol.
  */ 
+@KerberosInfo(JobContext.JOB_JOBTRACKER_ID)
 interface InterTrackerProtocol extends VersionedProtocol {
   /**
    * version 3 introduced to replace 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Feb  3 03:06:52
2010
@@ -653,8 +653,8 @@
                        maxMapSlots : maxReduceSlots;
     //set the num handlers to max*2 since canCommit may wait for the duration
     //of a heartbeat RPC
-    this.taskReportServer =
-      RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf);
+    this.taskReportServer = RPC.getServer(this.getClass(), this, bindAddress,
+        tmpPort, 2 * max, false, this.fConf, this.jobTokenSecretManager);
     this.taskReportServer.start();
 
     // get the assigned address
@@ -987,7 +987,6 @@
    *         job as a starting point.
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
   JobConf localizeJobFiles(Task t, RunningJob rjob)
       throws IOException, InterruptedException {
     JobID jobId = t.getJobID();
@@ -1001,8 +1000,7 @@
     rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
 
     TokenStorage ts = TokenCache.loadTokens(localJobTokenFile, fConf);
-    Token<JobTokenIdentifier> jt = 
-      (Token<JobTokenIdentifier>)TokenCache.getJobToken(ts);
+    Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
     if (jt != null) { //could be null in the case of some unit tests
       getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
     }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed
Feb  3 03:06:52 2010
@@ -22,11 +22,14 @@
 
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapred.JvmTask;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSelector;
+import org.apache.hadoop.security.token.TokenInfo;
 
 /** Protocol that task child process uses to contact its parent process.  The
  * parent is a daemon which which polls the central master for a new map or
  * reduce task and runs it as a child process.  All communication between child
  * and parent is via this protocol. */ 
+@TokenInfo(JobTokenSelector.class)
 public interface TaskUmbilicalProtocol extends VersionedProtocol {
 
   /** 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/protocol/ClientProtocol.java
Wed Feb  3 03:06:52 2010
@@ -24,6 +24,7 @@
 import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.QueueAclsInfo;
@@ -35,12 +36,14 @@
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.mapreduce.server.jobtracker.State;
+import org.apache.hadoop.security.KerberosInfo;
 
 /** 
  * Protocol that a JobClient and the central JobTracker use to communicate.  The
  * JobClient can use these methods to submit a Job for execution, and learn about
  * the current system status.
  */ 
+@KerberosInfo(JobContext.JOB_JOBTRACKER_ID)
 public interface ClientProtocol extends VersionedProtocol {
   /* 
    *Changing the versionID to 2L since the getTaskCompletionEvents method has

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/TokenCache.java Wed
Feb  3 03:06:52 2010
@@ -36,6 +36,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.TokenStorage;
 import org.apache.hadoop.security.token.Token;
@@ -230,9 +231,10 @@
    * 
    * @return job token
    */
+  @SuppressWarnings("unchecked")
   @InterfaceAudience.Private
-  public static Token<? extends TokenIdentifier> getJobToken(TokenStorage ts) {
-    return ts.getToken(JOB_TOKEN);
+  public static Token<JobTokenIdentifier> getJobToken(TokenStorage ts) {
+    return (Token<JobTokenIdentifier>) ts.getToken(JOB_TOKEN);
   }
   
   static String buildDTServiceName(URI uri) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenIdentifier.java
Wed Feb  3 03:06:52 2010
@@ -35,6 +35,13 @@
   final static Text KIND_NAME = new Text("mapreduce.job");
   
   /**
+   * Default constructor
+   */
+  public JobTokenIdentifier() {
+    this.jobid = new Text();
+  }
+
+  /**
    * Create a job token identifier from a jobid
    * @param jobid the jobid to use
    */
@@ -48,6 +55,12 @@
     return KIND_NAME;
   }
   
+  /** {@inheritDoc} */
+  @Override
+  public Text getUsername() {
+    return getJobId();
+  }
+  
   /**
    * Get the jobid
    * @return the jobid

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java?rev=905875&r1=905874&r2=905875&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSecretManager.java
Wed Feb  3 03:06:52 2010
@@ -124,4 +124,12 @@
     return retrieveTokenSecret(identifier.getJobId().toString()).getEncoded();
   }
 
+  /**
+   * Create an empty job token identifier
+   * @return a newly created empty job token identifier
+   */
+  @Override
+  public JobTokenIdentifier createIdentifier() {
+    return new JobTokenIdentifier();
+  }
 }

Added: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java?rev=905875&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java
(added)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/security/token/JobTokenSelector.java
Wed Feb  3 03:06:52 2010
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security.token;
+
+import java.util.Collection;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+/**
+ * Look through tokens to find the first job token that matches the service
+ * and return it.
+ */
+@InterfaceAudience.Private
+public class JobTokenSelector implements TokenSelector<JobTokenIdentifier> {
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Token<JobTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) {
+      return null;
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (JobTokenIdentifier.KIND_NAME.equals(token.getKind())
+          && service.equals(token.getService())) {
+        return (Token<JobTokenIdentifier>) token;
+      }
+    }
+    return null;
+  }
+}

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java?rev=905875&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
Wed Feb  3 03:06:52 2010
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.security;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.*;
+import org.apache.commons.logging.impl.Log4JLogger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslRpcClient;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.log4j.Level;
+import org.junit.Test;
+
+/** Unit tests for using Job Token over RPC. */
+public class TestUmbilicalProtocolWithJobToken {
+  private static final String ADDRESS = "0.0.0.0";
+
+  public static final Log LOG = LogFactory
+      .getLog(TestUmbilicalProtocolWithJobToken.class);
+
+  private static Configuration conf;
+  static {
+    conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+  }
+
+  static {
+    ((Log4JLogger) Client.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) Server.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) SaslRpcClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) SaslRpcServer.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger) SaslInputStream.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  @Test
+  public void testJobTokenRpc() throws Exception {
+    TaskUmbilicalProtocol mockTT = mock(TaskUmbilicalProtocol.class);
+    when(mockTT.getProtocolVersion(anyString(), anyLong())).thenReturn(
+        TaskUmbilicalProtocol.versionID);
+
+    JobTokenSecretManager sm = new JobTokenSecretManager();
+    final Server server = RPC.getServer(TaskUmbilicalProtocol.class, mockTT,
+        ADDRESS, 0, 5, true, conf, sm);
+
+    server.start();
+
+    final UserGroupInformation current = UserGroupInformation.getCurrentUser();
+    final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+    String jobId = current.getUserName();
+    JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(jobId));
+    Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(tokenId,
sm);
+    sm.addTokenForJob(jobId, token);
+    Text host = new Text(addr.getAddress().getHostAddress() + ":"
+        + addr.getPort());
+    token.setService(host);
+    LOG.info("Service IP address for token is " + host);
+    current.addToken(token);
+    current.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        TaskUmbilicalProtocol proxy = null;
+        try {
+          proxy = (TaskUmbilicalProtocol) RPC.getProxy(
+              TaskUmbilicalProtocol.class, TaskUmbilicalProtocol.versionID,
+              addr, conf);
+          proxy.ping(null);
+        } finally {
+          server.stop();
+          if (proxy != null) {
+            RPC.stopProxy(proxy);
+          }
+        }
+        return null;
+      }
+    });
+  }
+
+}



Mime
View raw message