tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-896. Fix AM splits to work on secure clusters when using the mapred API. (sseth)
Date Thu, 27 Feb 2014 00:54:27 GMT
Repository: incubator-tez
Updated Branches:
  refs/heads/master a9bace052 -> 444a96355


TEZ-896. Fix AM splits to work on secure clusters when using the mapred
API. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/444a9635
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/444a9635
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/444a9635

Branch: refs/heads/master
Commit: 444a96355fca50ad51e20da336ff71a6db6dc6b0
Parents: a9bace0
Author: Siddharth Seth <sseth@apache.org>
Authored: Wed Feb 26 16:53:58 2014 -0800
Committer: Siddharth Seth <sseth@apache.org>
Committed: Wed Feb 26 16:53:58 2014 -0800

----------------------------------------------------------------------
 .../org/apache/tez/client/TezClientUtils.java   |  3 ++
 .../org/apache/tez/common/impl/LogUtils.java    | 42 ++++++++++++++++++++
 .../main/java/org/apache/tez/dag/api/DAG.java   | 17 +++++---
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  8 +++-
 .../mapreduce/committer/MROutputCommitter.java  |  3 ++
 .../common/MRInputAMSplitGenerator.java         | 14 ++++---
 .../org/apache/tez/mapreduce/input/MRInput.java |  3 ++
 .../apache/tez/mapreduce/output/MROutput.java   |  3 ++
 8 files changed, 80 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/444a9635/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
index 82cb0c7..ff45cf5 100644
--- a/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
+++ b/tez-api/src/main/java/org/apache/tez/client/TezClientUtils.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.log4j.Level;
+import org.apache.tez.common.impl.LogUtils;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -247,6 +248,8 @@ public class TezClientUtils {
       Configuration conf) throws IOException {
 
     Preconditions.checkNotNull(sessionCredentials);
+    LogUtils.logCredentials(LOG, sessionCredentials, "session");
+
     Credentials dagCredentials = dag.getCredentials();
     if (dagCredentials == null) {
       dagCredentials = new Credentials();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/444a9635/tez-api/src/main/java/org/apache/tez/common/impl/LogUtils.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/impl/LogUtils.java b/tez-api/src/main/java/org/apache/tez/common/impl/LogUtils.java
new file mode 100644
index 0000000..d5e324e
--- /dev/null
+++ b/tez-api/src/main/java/org/apache/tez/common/impl/LogUtils.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.common.impl;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+
+@Private
+public class LogUtils {
+
+  public static void logCredentials(Log log, Credentials credentials, String identifier)
{
+    if (log.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder();
+      sb.append("#" + identifier + "Tokens=").append(credentials.numberOfTokens());
+      if (credentials.numberOfTokens() > 0) {
+        sb.append(", Services: ");
+        for (Token<?> t : credentials.getAllTokens()) {
+          sb.append(t.getService()).append(",");
+        }
+      }
+      log.debug(sb.toString());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/444a9635/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
index 6433dc8..fb82f08 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/DAG.java
@@ -24,43 +24,48 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Stack;
 
+import org.apache.commons.collections4.BidiMap;
+import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.tez.common.impl.LogUtils;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.dag.api.VertexGroup.GroupInfo;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
+import org.apache.tez.dag.api.VertexGroup.GroupInfo;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
-import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
 import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
+import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.collections4.bidimap.DualLinkedHashBidiMap;
-import org.apache.commons.collections4.BidiMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 public class DAG { // FIXME rename to Topology
+  
+  private static final Log LOG = LogFactory.getLog(DAG.class);
+  
   final BidiMap<String, Vertex> vertices = 
       new DualLinkedHashBidiMap<String, Vertex>();
   final Set<Edge> edges = Sets.newHashSet();
@@ -604,8 +609,8 @@ public class DAG { // FIXME rename to Topology
     }
     if (credentials != null) {
       dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(credentials));
+      LogUtils.logCredentials(LOG, credentials, "dag");
     }
-
     return dagBuilder.build();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/444a9635/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index cafcd3c..6098289 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.client.PreWarmContext;
 import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.impl.LogUtils;
 import org.apache.tez.common.security.JobTokenIdentifier;
 import org.apache.tez.common.security.TokenCache;
 import org.apache.tez.dag.api.DagTypeConverters;
@@ -132,9 +133,10 @@ import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
 import org.apache.tez.dag.utils.Graph;
 import org.apache.tez.runtime.library.common.security.JobTokenSecretManager;
-import com.google.common.annotations.VisibleForTesting;
 import org.codehaus.jettison.json.JSONException;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The Map-Reduce Application Master.
  * The state machine is encapsulated in the implementation of Job interface.
@@ -541,7 +543,9 @@ public class DAGAppMaster extends AbstractService {
 
     Credentials dagCredentials = null;
     if (dagPB.hasCredentialsBinary()) {
-      dagCredentials = DagTypeConverters.convertByteStringToCredentials(dagPB.getCredentialsBinary());
+      dagCredentials = DagTypeConverters.convertByteStringToCredentials(dagPB
+          .getCredentialsBinary());
+      LogUtils.logCredentials(LOG, dagCredentials, "dag");
     } else {
       dagCredentials = new Credentials();
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/444a9635/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
index d21bd96..03f28be 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/committer/MROutputCommitter.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.client.VertexStatus;
@@ -61,6 +62,8 @@ public class MROutputCommitter extends OutputCommitter {
           MRHelpers.createConfFromUserPayload(context.getUserPayload()));
     }
 
+    // Read all credentials into the credentials instance stored in JobConf.
+    jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
     jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
         context.getDAGAttemptNumber());
     committer = getOutputCommitter(context);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/444a9635/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
index 414778f..45f46bb 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/common/MRInputAMSplitGenerator.java
@@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
@@ -74,6 +75,7 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer
{
     }
     Configuration conf = MRHelpers.createConfFromByteString(userPayloadProto
         .getConfigurationBytes());
+    
     sendSerializedEvents = conf.getBoolean(
         MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD,
         MRJobConfig.MR_TEZ_INPUT_INITIALIZER_SERIALIZE_EVENT_PAYLAOD_DEFAULT);
@@ -99,15 +101,17 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer
{
         + " tasks. Headroom: " + totalResource + " Task Resource: "
         + taskResource + " waves: " + waves);
 
+    // Read all credentials into the credentials instance stored in JobConf.
+    JobConf jobConf = new JobConf(conf);
+    jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
 
     InputSplitInfoMem inputSplitInfo = null;
     String realInputFormatName = userPayloadProto.getInputFormatName(); 
     if ( realInputFormatName != null && !realInputFormatName.isEmpty()) {
       // split grouping on the AM
-      JobConf jobConf = new JobConf(conf);
       if (jobConf.getUseNewMapper()) {
         LOG.info("Grouping mapreduce api input splits");
-        Job job = Job.getInstance(conf);
+        Job job = Job.getInstance(jobConf);
         org.apache.hadoop.mapreduce.InputSplit[] splits = MRHelpers
             .generateNewSplits(job, realInputFormatName, numTasks);
 
@@ -129,7 +133,7 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer
{
                 Collections.singleton(rack)));
           }
         }
-        inputSplitInfo = new InputSplitInfoMem(splits, locationHints, splits.length, null,
conf);
+        inputSplitInfo = new InputSplitInfoMem(splits, locationHints, splits.length, null,
jobConf);
       } else {
         LOG.info("Grouping mapred api input splits");
         org.apache.hadoop.mapred.InputSplit[] splits = MRHelpers
@@ -151,10 +155,10 @@ public class MRInputAMSplitGenerator implements TezRootInputInitializer
{
                 Collections.singleton(rack)));
           }
         }
-        inputSplitInfo = new InputSplitInfoMem(splits, locationHints, splits.length, null,
conf);
+        inputSplitInfo = new InputSplitInfoMem(splits, locationHints, splits.length, null,
jobConf);
       }
     } else {
-      inputSplitInfo = MRHelpers.generateInputSplitsToMem(conf);
+      inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf);
     }
     if (LOG.isDebugEnabled()) {
       sw.stop();

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/444a9635/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
index 383b952..34b5527 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/input/MRInput.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
@@ -129,6 +130,8 @@ public class MRInput implements LogicalInput {
     Configuration conf =
       MRHelpers.createConfFromByteString(mrUserPayload.getConfigurationBytes());
     this.jobConf = new JobConf(conf);
+    // Add tokens to the jobConf - in case they are accessed within the RR / IF
+    jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
 
     TaskAttemptID taskAttemptId = new TaskAttemptID(
       new TaskID(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/444a9635/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index f9405f9..25a1e0f 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
@@ -99,6 +100,8 @@ public class MROutput implements LogicalOutput {
     Configuration conf = TezUtils.createConfFromUserPayload(
         outputContext.getUserPayload());
     this.jobConf = new JobConf(conf);
+    // Add tokens to the jobConf - in case they are accessed within the RW / OF
+    jobConf.getCredentials().mergeAll(UserGroupInformation.getCurrentUser().getCredentials());
     this.useNewApi = this.jobConf.getUseNewMapper();
     this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
         false);


Mime
View raw message