hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vik...@apache.org
Subject svn commit: r1585809 - in /hive/branches/branch-0.13: common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/a...
Date Tue, 08 Apr 2014 18:22:02 GMT
Author: vikram
Date: Tue Apr  8 18:22:02 2014
New Revision: 1585809

URL: http://svn.apache.org/r1585809
Log:
HIVE-6782 : HiveServer2Concurrency issue when running with tez intermittently, throwing org.apache.tez.dag.api.SessionNotRunning:
Application not running error (Vikram Dixit K, reviewed by Thejas Nair)

Modified:
    hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
    hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/branch-0.13/conf/hive-default.xml.template
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Context.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java

Modified: hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/common/FileUtils.java?rev=1585809&r1=1585808&r2=1585809&view=diff
==============================================================================
--- hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
(original)
+++ hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
Tue Apr  8 18:22:02 2014
@@ -441,6 +441,4 @@ public final class FileUtils {
     }
     return true;
   }
-
-
 }

Modified: hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1585809&r1=1585808&r2=1585809&view=diff
==============================================================================
--- hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue
Apr  8 18:22:02 2014
@@ -1017,7 +1017,8 @@ public class HiveConf extends Configurat
     // Check if a plan contains a Cross Product.
     // If there is one, output a warning to the Session's console.
     HIVE_CHECK_CROSS_PRODUCT("hive.exec.check.crossproducts", true),
-
+    HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL("hive.localize.resource.wait.interval", 5000L),
// in ms
+    HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS("hive.localize.resource.num.wait.attempts",
5),
     ;
 
     public final String varname;

Modified: hive/branches/branch-0.13/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/conf/hive-default.xml.template?rev=1585809&r1=1585808&r2=1585809&view=diff
==============================================================================
--- hive/branches/branch-0.13/conf/hive-default.xml.template (original)
+++ hive/branches/branch-0.13/conf/hive-default.xml.template Tue Apr  8 18:22:02 2014
@@ -2593,4 +2593,20 @@
   </description>
 </property>
 
+<property>
+  <name>hive.localize.resource.wait.interval</name>
+  <value>5000</value>
+  <description>
+    Time in milliseconds to wait for another thread to localize the same resource for hive-tez.
+  </description>
+</property>
+
+<property>
+  <name>hive.localize.resource.num.wait.attempts</name>
+  <value>5</value>
+  <description>
+    The number of attempts waiting for localizing a resource in hive-tez.
+  </description>
+</property>
+
 </configuration>

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Context.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Context.java?rev=1585809&r1=1585808&r2=1585809&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Context.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/Context.java Tue Apr 
8 18:22:02 2014
@@ -30,13 +30,16 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.TaskRunner;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.DataInput;
@@ -51,6 +54,8 @@ import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 
+import javax.security.auth.login.LoginException;
+
 /**
  * Context for Semantic Analyzers. Usage: not reusable - construct a new one for
  * each query should call clear() at end of use to remove temporary folders
@@ -204,12 +209,11 @@ public class Context {
         try {
           FileSystem fs = dirPath.getFileSystem(conf);
           dirPath = new Path(fs.makeQualified(dirPath).toString());
-          if (!fs.mkdirs(dirPath)) {
+          FsPermission fsPermission = new FsPermission(Short.parseShort(scratchDirPermission.trim(),
8));
+
+          if (!Utilities.createDirsWithPermission(conf, dirPath, fsPermission)) {
             throw new RuntimeException("Cannot make directory: "
                                        + dirPath.toString());
-          } else {
-            FsPermission fsPermission = new FsPermission(Short.parseShort(scratchDirPermission.trim(),
8));
-            fs.setPermission(dirPath, fsPermission);
           }
           if (isHDFSCleanup) {
             fs.deleteOnExit(dirPath);
@@ -222,6 +226,7 @@ public class Context {
       fsScratchDirs.put(fileSystem + "-" + TaskRunner.getTaskRunnerID(), dir);
 
     }
+
     return dir;
   }
 
@@ -256,6 +261,7 @@ public class Context {
     try {
       Path dir = FileUtils.makeQualified(nonLocalScratchPath, conf);
       URI uri = dir.toUri();
+
       Path newScratchDir = getScratchDir(uri.getScheme(), uri.getAuthority(),
                            !explain, uri.getPath());
       LOG.info("New scratch dir is " + newScratchDir);

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1585809&r1=1585808&r2=1585809&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue
Apr  8 18:22:02 2014
@@ -82,6 +82,8 @@ import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
+import javax.security.auth.login.LoginException;
+
 import org.antlr.runtime.CommonToken;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
@@ -95,6 +97,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveInterruptCallback;
 import org.apache.hadoop.hive.common.HiveInterruptUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
@@ -175,6 +179,7 @@ import org.apache.hadoop.mapred.RecordRe
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
 
@@ -317,7 +322,7 @@ public final class Utilities {
           LOG.debug("Loading plan from string: "+path.toUri().getPath());
           String planString = conf.get(path.toUri().getPath());
           if (planString == null) {
-            LOG.debug("Could not find plan string in conf");
+            LOG.info("Could not find plan string in conf");
             return null;
           }
           byte[] planBytes = Base64.decodeBase64(planString);
@@ -356,7 +361,7 @@ public final class Utilities {
       return gWork;
     } catch (FileNotFoundException fnf) {
       // happens. e.g.: no reduce work.
-      LOG.debug("No plan file found: "+path);
+      LOG.info("No plan file found: "+path);
       return null;
     } catch (Exception e) {
       LOG.error("Failed to load plan: "+path, e);
@@ -3235,6 +3240,7 @@ public final class Utilities {
   private static void createTmpDirs(Configuration conf,
       List<Operator<? extends OperatorDesc>> ops) throws IOException {
 
+    FsPermission fsPermission = new FsPermission((short)00777);
     while (!ops.isEmpty()) {
       Operator<? extends OperatorDesc> op = ops.remove(0);
 
@@ -3244,8 +3250,7 @@ public final class Utilities {
 
         if (tempDir != null) {
           Path tempPath = Utilities.toTempPath(tempDir);
-          FileSystem fs = tempPath.getFileSystem(conf);
-          fs.mkdirs(tempPath);
+          createDirsWithPermission(conf, tempPath, fsPermission);
         }
       }
 
@@ -3367,4 +3372,49 @@ public final class Utilities {
     }
     return footerCount;
   }
+
+  /**
+   * @param conf the configuration used to derive the filesystem to create the path
+   * @param mkdir the path to be created
+   * @param fsPermission ignored if it is hive server session and doAs is enabled
+   * @return true if successfully created the directory else false
+   * @throws IOException if hdfs experiences any error conditions
+   */
+  public static boolean createDirsWithPermission(Configuration conf, Path mkdir,
+      FsPermission fsPermission) throws IOException {
+    // this umask is required because by default the hdfs mask is 022 resulting in
+    // all parents getting the fsPermission & !(022) permission instead of fsPermission
+    boolean recursive = false;
+    if (SessionState.get() != null) {
+      recursive = SessionState.get().isHiveServerQuery() &&
+          conf.getBoolean(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname,
+              HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.defaultBoolVal);
+      // we reset the permission in case of hive server and doAs enabled because
+      // currently scratch directory uses /tmp/hive-hive as the scratch directory.
+      // However, with doAs enabled, the first user to create this directory would
+      // own the directory and subsequent users cannot access the scratch directory.
+      // The right fix is to have scratch dir per user.
+      fsPermission = new FsPermission((short)00777);
+    }
+
+    // if we made it so far without exception we are good!
+    return createDirsWithPermission(conf, mkdir, fsPermission, recursive);
+  }
+
+  public static boolean createDirsWithPermission(Configuration conf, Path mkdir,
+      FsPermission fsPermission, boolean recursive) throws IOException {
+    String origUmask = null;
+    if (recursive) {
+      origUmask = conf.get("fs.permissions.umask-mode");
+      conf.set("fs.permissions.umask-mode", "000");
+    }
+    FileSystem fs = mkdir.getFileSystem(conf);
+    boolean retval = fs.mkdirs(mkdir, fsPermission);
+    if (origUmask != null) {
+      conf.set("fs.permissions.umask-mode", origUmask);
+    } else {
+      conf.unset("fs.permissions.umask-mode");
+    }
+    return retval;
+  }
 }

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1585809&r1=1585808&r2=1585809&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
(original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
Tue Apr  8 18:22:02 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -111,7 +112,6 @@ import org.apache.tez.mapreduce.input.MR
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
 import org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileSortedOutput;
@@ -571,6 +571,7 @@ public class DagUtils {
     URL resourceURL = ConverterUtils.getYarnUrlFromPath(file);
     long resourceSize = fstat.getLen();
     long resourceModificationTime = fstat.getModificationTime();
+    LOG.info("Resource modification time: " + resourceModificationTime);
 
     LocalResource lr = Records.newRecord(LocalResource.class);
     lr.setResource(resourceURL);
@@ -713,9 +714,9 @@ public class DagUtils {
       if (!StringUtils.isNotBlank(file)) {
         continue;
       }
-      String hdfsFilePathStr = hdfsDirPathStr + "/" + getResourceBaseName(file);
+      Path hdfsFilePath = new Path(hdfsDirPathStr, getResourceBaseName(new Path(file)));
       LocalResource localResource = localizeResource(new Path(file),
-          new Path(hdfsFilePathStr), conf);
+          hdfsFilePath, conf);
       tmpResources.add(localResource);
     }
   }
@@ -760,10 +761,8 @@ public class DagUtils {
    * @param pathStr - the string from which we try to determine the resource base name
    * @return the name of the resource from a given path string.
    */
-  public String getResourceBaseName(String pathStr) {
-    // TODO: this should probably use Path::getName
-    String[] splits = pathStr.split("/");
-    return splits[splits.length - 1];
+  public String getResourceBaseName(Path path) {
+    return path.getName();
   }
 
   /**
@@ -776,22 +775,10 @@ public class DagUtils {
   private boolean checkPreExisting(Path src, Path dest, Configuration conf)
     throws IOException {
     FileSystem destFS = dest.getFileSystem(conf);
-
-    if (!destFS.exists(dest)) {
-      return false;
-    }
-    FileStatus destStatus = destFS.getFileStatus(dest);
-    if (destStatus.isDir()) {
-      return false;
-    }
-
-    String srcName = getResourceBaseName(src.toString());
-    String destName = getResourceBaseName(dest.toString());
-
-    if (srcName.equals(destName)) {
-      return true;
+    FileSystem sourceFS = src.getFileSystem(conf);
+    if (destFS.exists(dest)) {
+      return (sourceFS.getFileStatus(src).getLen() == destFS.getFileStatus(dest).getLen());
     }
-
     return false;
   }
 
@@ -810,10 +797,39 @@ public class DagUtils {
     }
 
     if (src != null) {
-      if (!checkPreExisting(src, dest, conf)) {
-        // copy the src to the destination and create local resource.
-        // overwrite even if file already exists.
-        destFS.copyFromLocalFile(false, true, src, dest);
+      // copy the src to the destination and create local resource.
+      // do not overwrite.
+      LOG.info("Localizing resource because it does not exist: " + src + " to dest: " + dest);
+      try {
+        destFS.copyFromLocalFile(false, false, src, dest);
+      } catch (IOException e) {
+        LOG.info("Looks like another thread is writing the same file will wait.");
+        int waitAttempts =
+            conf.getInt(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.varname,
+                HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_NUM_WAIT_ATTEMPTS.defaultIntVal);
+        long sleepInterval =
+            conf.getLong(HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.varname,
+                HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL.defaultLongVal);
+        LOG.info("Number of wait attempts: " + waitAttempts + ". Wait interval: "
+            + sleepInterval);
+        boolean found = false;
+        for (int i = 0; i < waitAttempts; i++) {
+          if (!checkPreExisting(src, dest, conf)) {
+            try {
+              Thread.currentThread().sleep(sleepInterval);
+            } catch (InterruptedException interruptedException) {
+              throw new IOException(interruptedException);
+            }
+          } else {
+            found = true;
+            break;
+          }
+        }
+        if (!found) {
+          LOG.error("Could not find the jar that was being uploaded");
+          throw new IOException("Previous writer likely failed to write " + dest +
+              ". Failing because I am unlikely to write too.");
+        }
       }
     }
 
@@ -948,11 +964,27 @@ public class DagUtils {
    * be used with Tez. Assumes scratchDir exists.
    */
   public Path createTezDir(Path scratchDir, Configuration conf)
-    throws IOException {
+      throws IOException {
+    UserGroupInformation ugi;
+    String userName = System.getProperty("user.name");
+    try {
+      ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
+      userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
+    } catch (LoginException e) {
+      throw new IOException(e);
+    }
+
+    scratchDir = new Path(scratchDir, userName);
+
     Path tezDir = getTezDir(scratchDir);
     FileSystem fs = tezDir.getFileSystem(conf);
+    LOG.debug("TezDir path set " + tezDir + " for user: " + userName);
+    // since we are adding the user name to the scratch dir, we do not
+    // need to give more permissions here
     fs.mkdirs(tezDir);
+
     return tezDir;
+
   }
 
   /**

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1585809&r1=1585808&r2=1585809&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
(original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
Tue Apr  8 18:22:02 2014
@@ -21,11 +21,15 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 
+import javax.security.auth.login.LoginException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * This class is for managing multiple tez sessions particularly when
@@ -213,6 +217,18 @@ public class TezSessionPoolManager {
       return false;
     }
 
+    try {
+      UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
+      String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
+      LOG.info("The current user: " + userName + ", session user: " + session.getUser());
+      if (userName.equals(session.getUser()) == false) {
+        LOG.info("Different users incoming: " + userName + " existing: " + session.getUser());
+        return false;
+      }
+    } catch (Exception e) {
+      throw new HiveException(e);
+    }
+
     HiveConf existingConf = session.getConf();
     if (existingConf == null) {
       return false;

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1585809&r1=1585808&r2=1585809&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
(original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
Tue Apr  8 18:22:02 2014
@@ -38,10 +38,15 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.PreWarmContext;
@@ -68,6 +73,7 @@ public class TezSessionState {
   private DagUtils utils;
   private String queueName;
   private boolean defaultQueue = false;
+  private String user;
 
   private HashSet<String> additionalAmFiles = null;
 
@@ -127,6 +133,11 @@ public class TezSessionState {
     throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException
{
     this.conf = conf;
 
+    UserGroupInformation ugi;
+    ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
+    user = ShimLoader.getHadoopShims().getShortUserName(ugi);
+    LOG.info("User of session id " + sessionId + " is " + user);
+
     // create the tez tmp dir
     tezScratchDir = createTezDir(sessionId);
 
@@ -219,8 +230,7 @@ public class TezSessionState {
     }
 
     if (!keepTmpDir) {
-      FileSystem fs = tezScratchDir.getFileSystem(conf);
-      fs.delete(tezScratchDir, true);
+      cleanupScratchDir();
     }
     session = null;
     tezScratchDir = null;
@@ -229,6 +239,12 @@ public class TezSessionState {
     additionalAmFiles = null;
   }
 
+  public void cleanupScratchDir () throws IOException {
+    FileSystem fs = tezScratchDir.getFileSystem(conf);
+    fs.delete(tezScratchDir, true);
+    tezScratchDir = null;
+  }
+
   public String getSessionId() {
     return sessionId;
   }
@@ -257,7 +273,8 @@ public class TezSessionState {
         TEZ_DIR);
     tezDir = new Path(tezDir, sessionId);
     FileSystem fs = tezDir.getFileSystem(conf);
-    fs.mkdirs(tezDir);
+    FsPermission fsPermission = new FsPermission((short)00777);
+    Utilities.createDirsWithPermission(conf, tezDir, fsPermission, true);
 
     // don't keep the directory around on non-clean exit
     fs.deleteOnExit(tezDir);
@@ -313,6 +330,7 @@ public class TezSessionState {
 
     Path localFile = new Path(localJarPath);
     String sha = getSha(localFile);
+
     String destFileName = localFile.getName();
 
     // Now, try to find the file based on SHA and name. Currently we require exact name match.
@@ -326,19 +344,6 @@ public class TezSessionState {
 
     // TODO: if this method is ever called on more than one jar, getting the dir and the
     //       list need to be refactored out to be done only once.
-    Path jarPath = null;
-    FileStatus[] listFileStatus = destFs.listStatus(destDirPath);
-    for (FileStatus fstatus : listFileStatus) {
-      String jarName = utils.getResourceBaseName(fstatus.getPath().toString()); // ...
-      if (jarName.equals(destFileName)) {
-        // We have found the jar we need.
-        jarPath = fstatus.getPath();
-        return utils.localizeResource(null, jarPath, conf);
-      }
-    }
-
-    // Jar wasn't in the directory, copy the one in current use.
-    assert jarPath == null;
     Path destFile = new Path(destDirPath.toString() + "/" + destFileName);
     return utils.localizeResource(localFile, destFile, conf);
   }
@@ -386,4 +391,8 @@ public class TezSessionState {
   public HiveConf getConf() {
     return conf;
   }
+
+  public String getUser() {
+    return user;
+  }
 }

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1585809&r1=1585808&r2=1585809&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
(original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
Tue Apr  8 18:22:02 2014
@@ -126,7 +126,9 @@ public class TezTask extends Task<TezWor
       // we will localize all the files (jars, plans, hashtables) to the
       // scratch dir. let's create this and tmp first.
       Path scratchDir = ctx.getMRScratchDir();
-      utils.createTezDir(scratchDir, conf);
+
+      // create the tez tmp dir
+      scratchDir = utils.createTezDir(scratchDir, conf);
 
       // we need to get the user specified local resources for this dag
       String hiveJarDir = utils.getHiveJarDirectory(conf);
@@ -216,8 +218,7 @@ public class TezTask extends Task<TezWor
     List<BaseWork> ws = work.getAllWork();
     Collections.reverse(ws);
 
-    Path tezDir = utils.getTezDir(scratchDir);
-    FileSystem fs = tezDir.getFileSystem(conf);
+    FileSystem fs = scratchDir.getFileSystem(conf);
 
     // the name of the dag is what is displayed in the AM/Job UI
     DAG dag = new DAG(work.getName());
@@ -272,7 +273,7 @@ public class TezTask extends Task<TezWor
       } else {
         // Regular vertices
         JobConf wxConf = utils.initializeVertexConf(conf, w);
-        Vertex wx = utils.createVertex(wxConf, w, tezDir, appJarLr, 
+        Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr, 
           additionalLr, fs, ctx, !isFinal, work);
         dag.addVertex(wx);
         utils.addCredentials(w, dag);

Modified: hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java?rev=1585809&r1=1585808&r2=1585809&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
(original)
+++ hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
Tue Apr  8 18:22:02 2014
@@ -24,6 +24,8 @@ import java.net.URISyntaxException;
 import javax.security.auth.login.LoginException;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.dag.api.TezException;
 
 
@@ -37,6 +39,7 @@ public class TestTezSessionState extends
   private boolean open;
   private String sessionId;
   private HiveConf hiveConf;
+  private String user;
 
   public TestTezSessionState(String sessionId) {
     super(sessionId);
@@ -53,9 +56,13 @@ public class TestTezSessionState extends
   }
 
   @Override
-  public void open(HiveConf conf) {
-    this.hiveConf = conf;
-  }
+    public void open(HiveConf conf) throws IOException,
+           LoginException, URISyntaxException, TezException {
+             this.hiveConf = conf;
+             UserGroupInformation ugi;
+             ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
+             user = ShimLoader.getHadoopShims().getShortUserName(ugi);
+    }
 
   @Override
     public void close(boolean keepTmpDir) throws TezException, IOException {
@@ -70,4 +77,8 @@ public class TestTezSessionState extends
     public String getSessionId() {
       return sessionId;
     }
+  
+  public String getUser() {
+    return user;
+  }
 }



Mime
View raw message