hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject svn commit: r1585602 - in /hive/trunk/ql/src: java/org/apache/hadoop/hive/ql/exec/tez/ java/org/apache/hadoop/hive/ql/optimizer/ java/org/apache/hadoop/hive/ql/plan/ java/org/apache/hadoop/hive/ql/session/ test/org/apache/hadoop/hive/ql/exec/tez/
Date Mon, 07 Apr 2014 22:06:09 GMT
Author: sershe
Date: Mon Apr  7 22:06:09 2014
New Revision: 1585602

URL: http://svn.apache.org/r1585602
Log:
HIVE-6739 : Hive HBase query fails on Tez due to missing jars and then due to NPE in getSplits
(Sergey Shelukhin, reviewed by Vikram Dixit K)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Apr  7 22:06:09
2014
@@ -604,7 +604,7 @@ public class DagUtils {
     combinedResources.putAll(sessionConfig.getSessionResources());
 
     try {
-      for(LocalResource lr : localizeTempFiles(conf)) {
+      for(LocalResource lr : localizeTempFilesFromConf(getHiveJarDirectory(conf), conf))
{
         combinedResources.put(getBaseName(lr), lr);
       }
     } catch(LoginException le) {
@@ -665,7 +665,8 @@ public class DagUtils {
    * @throws IOException when hdfs operation fails
    * @throws LoginException when getDefaultDestDir fails with the same exception
    */
-  public List<LocalResource> localizeTempFiles(Configuration conf) throws IOException,
LoginException {
+  public List<LocalResource> localizeTempFilesFromConf(
+      String hdfsDirPathStr, Configuration conf) throws IOException, LoginException {
     List<LocalResource> tmpResources = new ArrayList<LocalResource>();
 
     String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
@@ -683,15 +684,32 @@ public class DagUtils {
 
     String auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS);
 
-    // need to localize the additional jars and files
+    String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives;
+    addTempFiles(conf, tmpResources, hdfsDirPathStr, allFiles.split(","));
+    return tmpResources;
+  }
 
-    // we need the directory on hdfs to which we shall put all these files
-    // Use HIVE_JAR_DIRECTORY only if it's set explicitly; otherwise use default directory
-    String hdfsDirPathStr = getHiveJarDirectory(conf);
+  /**
+   * Localizes files, archives and jars from a provided array of names.
+   * @param hdfsDirPathStr Destination directoty in HDFS.
+   * @param conf Configuration.
+   * @param inputOutputJars The file names to localize.
+   * @return List<LocalResource> local resources to add to execution
+   * @throws IOException when hdfs operation fails.
+   * @throws LoginException when getDefaultDestDir fails with the same exception
+   */
+  public List<LocalResource> localizeTempFiles(String hdfsDirPathStr, Configuration
conf,
+      String[] inputOutputJars) throws IOException, LoginException {
+    if (inputOutputJars == null) return null;
+    List<LocalResource> tmpResources = new ArrayList<LocalResource>();
+    addTempFiles(conf, tmpResources, hdfsDirPathStr, inputOutputJars);
+    return tmpResources;
+  }
 
-    String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives;
-    String[] allFilesArr = allFiles.split(",");
-    for (String file : allFilesArr) {
+  private void addTempFiles(Configuration conf,
+      List<LocalResource> tmpResources, String hdfsDirPathStr,
+      String[] files) throws IOException {
+    for (String file : files) {
       if (!StringUtils.isNotBlank(file)) {
         continue;
       }
@@ -700,8 +718,6 @@ public class DagUtils {
           new Path(hdfsFilePathStr), conf);
       tmpResources.add(localResource);
     }
-
-    return tmpResources;
   }
 
   public String getHiveJarDirectory(Configuration conf) throws IOException, LoginException
{

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Mon
Apr  7 22:06:09 2014
@@ -64,7 +64,7 @@ public class TezSessionPoolManager {
       HiveConf newConf = new HiveConf(initConf);
       TezSessionState sessionState = defaultQueuePool.take();
       newConf.set("tez.queue.name", sessionState.getQueueName());
-      sessionState.open(TezSessionState.makeSessionId(), newConf);
+      sessionState.open(newConf);
       defaultQueuePool.put(sessionState);
     }
   }
@@ -91,7 +91,7 @@ public class TezSessionPoolManager {
         if (queue.length() == 0) {
           continue;
         }
-        TezSessionState sessionState = createSession();
+        TezSessionState sessionState = createSession(TezSessionState.makeSessionId());
         sessionState.setQueueName(queue);
         sessionState.setDefault();
         LOG.info("Created new tez session for queue: " + queue +
@@ -102,7 +102,7 @@ public class TezSessionPoolManager {
     }
   }
 
-  private TezSessionState getSession(HiveConf conf)
+  private TezSessionState getSession(HiveConf conf, boolean doOpen)
       throws Exception {
 
     String queueName = conf.get("tez.queue.name");
@@ -120,7 +120,7 @@ public class TezSessionPoolManager {
       LOG.info("QueueName: " + queueName + " nonDefaultUser: " + nonDefaultUser +
           " defaultQueuePool: " + defaultQueuePool +
           " blockingQueueLength: " + blockingQueueLength);
-      return getNewSessionState(conf, queueName);
+      return getNewSessionState(conf, queueName, doOpen);
     }
 
     LOG.info("Choosing a session from the defaultQueuePool");
@@ -130,16 +130,21 @@ public class TezSessionPoolManager {
   /**
    * @param conf HiveConf that is used to initialize the session
    * @param queueName could be null. Set in the tez session.
+   * @param doOpen
    * @return
    * @throws Exception
    */
   private TezSessionState getNewSessionState(HiveConf conf,
-      String queueName) throws Exception {
-    TezSessionState retTezSessionState = createSession();
+      String queueName, boolean doOpen) throws Exception {
+    TezSessionState retTezSessionState = createSession(TezSessionState.makeSessionId());
     retTezSessionState.setQueueName(queueName);
-    retTezSessionState.open(TezSessionState.makeSessionId(), conf);
+    String what = "Created";
+    if (doOpen) {
+      retTezSessionState.open(conf);
+      what = "Started";
+    }
 
-    LOG.info("Started a new session for queue: " + queueName +
+    LOG.info(what + " a new session for queue: " + queueName +
         " session id: " + retTezSessionState.getSessionId());
     return retTezSessionState;
   }
@@ -179,11 +184,12 @@ public class TezSessionPoolManager {
     }
   }
 
-  protected TezSessionState createSession() {
-    return new TezSessionState();
+  protected TezSessionState createSession(String sessionId) {
+    return new TezSessionState(sessionId);
   }
 
-  public TezSessionState getSession(TezSessionState session, HiveConf conf) throws Exception
{
+  public TezSessionState getSession(
+      TezSessionState session, HiveConf conf, boolean doOpen) throws Exception {
     if (canWorkWithSameSession(session, conf)) {
       return session;
     }
@@ -192,7 +198,7 @@ public class TezSessionPoolManager {
       session.close(false);
     }
 
-    return getSession(conf);
+    return getSession(conf, doOpen);
   }
 
   /*

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Mon Apr
 7 22:06:09 2014
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -68,6 +69,8 @@ public class TezSessionState {
   private String queueName;
   private boolean defaultQueue = false;
 
+  private HashSet<String> additionalAmFiles = null;
+
   private static List<TezSessionState> openSessions
     = Collections.synchronizedList(new LinkedList<TezSessionState>());
 
@@ -83,8 +86,9 @@ public class TezSessionState {
    * Constructor. We do not automatically connect, because we only want to
    * load tez classes when the user has tez installed.
    */
-  public TezSessionState() {
+  public TezSessionState(String sessionId) {
     this(DagUtils.getInstance());
+    this.sessionId = sessionId;
   }
 
   /**
@@ -106,6 +110,11 @@ public class TezSessionState {
     return UUID.randomUUID().toString();
   }
 
+  public void open(HiveConf conf)
+      throws IOException, LoginException, URISyntaxException, TezException {
+    open(conf, null);
+  }
+
   /**
    * Creates a tez session. A session is tied to either a cli/hs2 session. You can
    * submit multiple DAGs against a session (as long as they are executed serially).
@@ -114,10 +123,8 @@ public class TezSessionState {
    * @throws LoginException
    * @throws TezException
    */
-  public void open(String sessionId, HiveConf conf)
-    throws IOException, LoginException, URISyntaxException, TezException {
-
-    this.sessionId = sessionId;
+  public void open(HiveConf conf, List<LocalResource> additionalLr)
+    throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException
{
     this.conf = conf;
 
     // create the tez tmp dir
@@ -135,6 +142,14 @@ public class TezSessionState {
     // configuration for the application master
     Map<String, LocalResource> commonLocalResources = new HashMap<String, LocalResource>();
     commonLocalResources.put(utils.getBaseName(appJarLr), appJarLr);
+    if (additionalLr != null) {
+      additionalAmFiles = new HashSet<String>();
+      for (LocalResource lr : additionalLr) {
+        String baseName = utils.getBaseName(lr);
+        additionalAmFiles.add(baseName);
+        commonLocalResources.put(baseName, lr);
+      }
+    }
 
     // Create environment for AM.
     Map<String, String> amEnv = new HashMap<String, String>();
@@ -174,6 +189,15 @@ public class TezSessionState {
     openSessions.add(this);
   }
 
+  public boolean hasResources(List<LocalResource> lrs) {
+    if (lrs == null || lrs.isEmpty()) return true;
+    if (additionalAmFiles == null || additionalAmFiles.isEmpty()) return false;
+    for (LocalResource lr : lrs) {
+      if (!additionalAmFiles.contains(utils.getBaseName(lr))) return false;
+    }
+    return true;
+  }
+
   /**
    * Close a tez session. Will cleanup any tez/am related resources. After closing a session
    * no further DAGs can be executed against it.
@@ -202,6 +226,7 @@ public class TezSessionState {
     tezScratchDir = null;
     conf = null;
     appJarLr = null;
+    additionalAmFiles = null;
   }
 
   public String getSessionId() {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Apr  7 22:06:09
2014
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -103,45 +104,62 @@ public class TezTask extends Task<TezWor
     TezSessionState session = null;
 
     try {
-      // Get or create Context object. If we create it we have to clean
-      // it later as well.
+      // Get or create Context object. If we create it we have to clean it later as well.
       ctx = driverContext.getCtx();
       if (ctx == null) {
         ctx = new Context(conf);
         cleanContext = true;
       }
 
-      // Need to remove this static hack. But this is the way currently to
-      // get a session.
+      // Need to remove this static hack. But this is the way currently to get a session.
       SessionState ss = SessionState.get();
       session = ss.getTezSession();
-      session = TezSessionPoolManager.getInstance().getSession(session, conf);
+      session = TezSessionPoolManager.getInstance().getSession(session, conf, false);
       ss.setTezSession(session);
 
-      // if it's not running start it.
-      if (!session.isOpen()) {
-        // can happen if the user sets the tez flag after the session was
-        // established
-        LOG.info("Tez session hasn't been created yet. Opening session");
-        session.open(session.getSessionId(), conf);
-      }
+      // jobConf will hold all the configuration for hadoop, tez, and hive
+      JobConf jobConf = utils.createConfiguration(conf);
+
+      // Get all user jars from work (e.g. input format stuff).
+      String[] inputOutputJars = work.configureJobConfAndExtractJars(jobConf);
 
       // we will localize all the files (jars, plans, hashtables) to the
-      // scratch dir. let's create this first.
+      // scratch dir. let's create this and tmp first.
       Path scratchDir = ctx.getMRScratchDir();
-
-      // create the tez tmp dir
       utils.createTezDir(scratchDir, conf);
 
-      // jobConf will hold all the configuration for hadoop, tez, and hive
-      JobConf jobConf = utils.createConfiguration(conf);
+      // we need to get the user specified local resources for this dag
+      String hiveJarDir = utils.getHiveJarDirectory(conf);
+      List<LocalResource> additionalLr = utils.localizeTempFilesFromConf(hiveJarDir,
conf);
+      List<LocalResource> handlerLr = utils.localizeTempFiles(hiveJarDir, conf, inputOutputJars);
+      if (handlerLr != null) {
+        additionalLr.addAll(handlerLr);
+      }
+
+      // If we have any jars from input format, we need to restart the session because
+      // AM will need them; so, AM has to be restarted. What a mess...
+      if (!session.hasResources(handlerLr)) {
+        if (session.isOpen()) {
+          LOG.info("Tez session being reopened to pass custom jars to AM");
+          session.close(false);
+          session = TezSessionPoolManager.getInstance().getSession(null, conf, false);
+          ss.setTezSession(session);
+        }
+        session.open(conf, additionalLr);
+      }
+      if (!session.isOpen()) {
+        // can happen if the user sets the tez flag after the session was
+        // established
+        LOG.info("Tez session hasn't been created yet. Opening session");
+        session.open(conf);
+      }
 
       // unless already installed on all the cluster nodes, we'll have to
       // localize hive-exec.jar as well.
       LocalResource appJarLr = session.getAppJarLr();
 
       // next we translate the TezWork to a Tez DAG
-      DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx);
+      DAG dag = build(jobConf, work, scratchDir, appJarLr, additionalLr, ctx);
 
       // submit will send the job to the cluster and start executing
       client = submit(jobConf, dag, scratchDir, appJarLr, session);
@@ -186,16 +204,13 @@ public class TezTask extends Task<TezWor
   }
 
   DAG build(JobConf conf, TezWork work, Path scratchDir,
-      LocalResource appJarLr, Context ctx)
+      LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx)
       throws Exception {
 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_BUILD_DAG);
     Map<BaseWork, Vertex> workToVertex = new HashMap<BaseWork, Vertex>();
     Map<BaseWork, JobConf> workToConf = new HashMap<BaseWork, JobConf>();
 
-    // we need to get the user specified local resources for this dag
-    List<LocalResource> additionalLr = utils.localizeTempFiles(conf);
-
     // getAllWork returns a topologically sorted list, which we use to make
     // sure that vertices are created before they are used in edges.
     List<BaseWork> ws = work.getAllWork();
@@ -299,7 +314,7 @@ public class TezTask extends Task<TezWor
       sessionState.close(true);
 
       // (re)open the session
-      sessionState.open(sessionState.getSessionId(), this.conf);
+      sessionState.open(this.conf);
 
       console.printInfo("Session re-established.");
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Apr
 7 22:06:09 2014
@@ -845,6 +845,13 @@ public final class GenMapRedUtils {
           setKeyAndValueDesc(work.getReduceWork(), op);
         }
       }
+    } else if (task != null && (task.getWork() instanceof TezWork)) {
+      TezWork work = (TezWork)task.getWork();
+      for (BaseWork w : work.getAllWorkUnsorted()) {
+        if (w instanceof MapWork) {
+          ((MapWork)w).deriveExplainAttributes();
+        }
+      }
     }
 
     if (task.getChildTasks() == null) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Mon Apr  7 22:06:09
2014
@@ -28,6 +28,7 @@ import java.util.Stack;
 
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * BaseWork. Base class for any "work" that's being done on the cluster. Items like stats
@@ -106,4 +107,6 @@ public abstract class BaseWork extends A
 
     return returnSet;
   }
+
+  public abstract void configureJobConf(JobConf job);
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Mon Apr  7 22:06:09
2014
@@ -172,6 +172,8 @@ public class MapWork extends BaseWork {
 
   /**
    * Derive additional attributes to be rendered by EXPLAIN.
+   * TODO: this method is relied upon by custom input formats to set jobconf properties.
+   *       This is madness? - This is Hive Storage Handlers!
    */
   public void deriveExplainAttributes() {
     if (pathToPartitionInfo != null) {
@@ -495,6 +497,7 @@ public class MapWork extends BaseWork {
         samplingType == 2 ? "SAMPLING_ON_START" : null;
   }
 
+  @Override
   public void configureJobConf(JobConf job) {
     for (PartitionDesc partition : aliasToPartnInfo.values()) {
       PlanUtils.configureJobConf(partition.getTableDesc(), job);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java Mon Apr  7 22:06:09
2014
@@ -33,6 +33,7 @@ import org.apache.commons.lang3.tuple.Pa
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.dag.api.EdgeProperty;
 
 /**
@@ -93,6 +94,10 @@ public class TezWork extends AbstractOpe
     return result;
   }
 
+  public Collection<BaseWork> getAllWorkUnsorted() {
+    return workGraph.keySet();
+  }
+
   private void visit(BaseWork child, Set<BaseWork> seen, List<BaseWork> result)
{
 
     if (seen.contains(child)) {
@@ -271,6 +276,37 @@ public class TezWork extends AbstractOpe
     }
     return result;
   }
+  
+  private static final String MR_JAR_PROPERTY = "tmpjars";
+  /**
+   * Calls configureJobConf on instances of work that are part of this TezWork.
+   * Uses the passed job configuration to extract "tmpjars" added by these, so that Tez
+   * could add them to the job proper Tez way. This is a very hacky way but currently
+   * there's no good way to get these JARs - both storage handler interface, and HBase
+   * code, would have to change to get the list directly (right now it adds to tmpjars).
+   * This will happen in 0.14 hopefully.
+   * @param jobConf Job configuration.
+   * @return List of files added to tmpjars by storage handlers.
+   */
+  public String[] configureJobConfAndExtractJars(JobConf jobConf) {
+    String[] oldTmpJars = jobConf.getStrings(MR_JAR_PROPERTY);
+    jobConf.setStrings(MR_JAR_PROPERTY, new String[0]);
+    for (BaseWork work : workGraph.keySet()) {
+      work.configureJobConf(jobConf);
+    }
+    String[] newTmpJars = jobConf.getStrings(MR_JAR_PROPERTY);
+    if (oldTmpJars != null && (oldTmpJars.length != 0)) {
+      if (newTmpJars != null && (newTmpJars.length != 0)) {
+        String[] combinedTmpJars = new String[newTmpJars.length + oldTmpJars.length];
+        System.arraycopy(oldTmpJars, 0, combinedTmpJars, 0, oldTmpJars.length);
+        System.arraycopy(newTmpJars, 0, combinedTmpJars, oldTmpJars.length, newTmpJars.length);
+        jobConf.setStrings(MR_JAR_PROPERTY, combinedTmpJars);
+      } else {
+        jobConf.setStrings(MR_JAR_PROPERTY, oldTmpJars);
+      }
+    }
+    return newTmpJars;
+   }
 
   /**
    * connect adds an edge between a and b. Both nodes have

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/UnionWork.java Mon Apr  7 22:06:09
2014
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.mapred.JobConf;
 
 /**
  * Simple wrapper for union all cases. All contributing work for a union all
@@ -68,4 +69,7 @@ public class UnionWork extends BaseWork 
   public Set<UnionOperator> getUnionOperators() {
     return unionOperators;
   }
+
+  public void configureJobConf(JobConf job) {
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Apr  7
22:06:09 2014
@@ -347,9 +347,9 @@ public class SessionState {
         .equals("tez") && (startSs.isHiveServerQuery == false)) {
       try {
         if (startSs.tezSessionState == null) {
-          startSs.tezSessionState = new TezSessionState();
+          startSs.tezSessionState = new TezSessionState(startSs.getSessionId());
         }
-        startSs.tezSessionState.open(startSs.getSessionId(), startSs.conf);
+        startSs.tezSessionState.open(startSs.conf);
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -815,10 +815,10 @@ public class SessionState {
   }
 
   public Set<String> list_resource(ResourceType t, List<String> filter) {
-    if (resource_map.get(t) == null) {
+    Set<String> orig = resource_map.get(t);
+    if (orig == null) {
       return null;
     }
-    Set<String> orig = resource_map.get(t);
     if (filter == null) {
       return orig;
     } else {

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java Mon
Apr  7 22:06:09 2014
@@ -40,9 +40,9 @@ public class TestTezSessionPool {
     }
 
     @Override
-      public TezSessionState createSession() {
-        return new TestTezSessionState();
-      }
+    public TezSessionState createSession(String sessionId) {
+      return new TestTezSessionState(sessionId);
+    }
   }
 
   @Before
@@ -54,8 +54,8 @@ public class TestTezSessionPool {
     public void testGetNonDefaultSession() {
       poolManager = new TestTezSessionPoolManager();
       try {
-        TezSessionState sessionState = poolManager.getSession(null, conf);
-        TezSessionState sessionState1 = poolManager.getSession(sessionState, conf);
+        TezSessionState sessionState = poolManager.getSession(null, conf, true);
+        TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true);
         if (sessionState1 != sessionState) {
           fail();
         }
@@ -75,25 +75,25 @@ public class TestTezSessionPool {
         poolManager = new TestTezSessionPoolManager();
         poolManager.setupPool(conf);
         poolManager.startPool();
-        TezSessionState sessionState = poolManager.getSession(null, conf);
+        TezSessionState sessionState = poolManager.getSession(null, conf, true);
         if (sessionState.getQueueName().compareTo("a") != 0) {
           fail();
         }
         poolManager.returnSession(sessionState);
 
-        sessionState = poolManager.getSession(null, conf);
+        sessionState = poolManager.getSession(null, conf, true);
         if (sessionState.getQueueName().compareTo("b") != 0) {
           fail();
         }
         poolManager.returnSession(sessionState);
 
-        sessionState = poolManager.getSession(null, conf);
+        sessionState = poolManager.getSession(null, conf, true);
         if (sessionState.getQueueName().compareTo("c") != 0) {
           fail();
         }
         poolManager.returnSession(sessionState);
 
-        sessionState = poolManager.getSession(null, conf);
+        sessionState = poolManager.getSession(null, conf, true);
         if (sessionState.getQueueName().compareTo("a") != 0) {
           fail();
         }
@@ -118,7 +118,7 @@ public class TestTezSessionPool {
             tmpConf.set("tez.queue.name", "");
           }
 
-          TezSessionState session = poolManager.getSession(null, tmpConf);
+          TezSessionState session = poolManager.getSession(null, tmpConf, true);
           Thread.sleep((random.nextInt(9) % 10) * 1000);
           poolManager.returnSession(session);
         } catch (Exception e) {

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionState.java Mon
Apr  7 22:06:09 2014
@@ -38,6 +38,11 @@ public class TestTezSessionState extends
   private String sessionId;
   private HiveConf hiveConf;
 
+  public TestTezSessionState(String sessionId) {
+    super(sessionId);
+    this.sessionId = sessionId;
+  }
+
   @Override
     public boolean isOpen() {
       return open;
@@ -48,11 +53,9 @@ public class TestTezSessionState extends
   }
 
   @Override
-    public void open(String sessionId, HiveConf conf) throws IOException,
-           LoginException, URISyntaxException, TezException {
-             this.sessionId = sessionId;
-             this.hiveConf = conf;
-    }
+  public void open(HiveConf conf) {
+    this.hiveConf = conf;
+  }
 
   @Override
     public void close(boolean keepTmpDir) throws TezException, IOException {

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java?rev=1585602&r1=1585601&r2=1585602&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java Mon Apr  7
22:06:09 2014
@@ -33,6 +33,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.security.auth.login.LoginException;
 
@@ -176,7 +177,7 @@ public class TestTezTask {
 
   @Test
   public void testBuildDag() throws IllegalArgumentException, IOException, Exception {
-    DAG dag = task.build(conf, work, path, appLr, new Context(conf));
+    DAG dag = task.build(conf, work, path, appLr, null, new Context(conf));
     for (BaseWork w: work.getAllWork()) {
       Vertex v = dag.getVertex(w.getName());
       assertNotNull(v);
@@ -196,7 +197,7 @@ public class TestTezTask {
 
   @Test
   public void testEmptyWork() throws IllegalArgumentException, IOException, Exception {
-    DAG dag = task.build(conf, new TezWork(""), path, appLr, new Context(conf));
+    DAG dag = task.build(conf, new TezWork(""), path, appLr, null, new Context(conf));
     assertEquals(dag.getVertices().size(), 0);
   }
 
@@ -206,7 +207,7 @@ public class TestTezTask {
     DAG dag = new DAG("test");
     task.submit(conf, dag, path, appLr, sessionState);
     // validate close/reopen
-    verify(sessionState, times(1)).open(any(String.class), any(HiveConf.class));
+    verify(sessionState, times(1)).open(any(HiveConf.class));
     verify(sessionState, times(1)).close(eq(true));
     verify(session, times(2)).submitDAG(any(DAG.class));
   }



Mime
View raw message