hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1520711 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql: ./ exec/ exec/mr/ io/ log/ optimizer/physical/ plan/
Date Fri, 06 Sep 2013 22:41:28 GMT
Author: hashutosh
Date: Fri Sep  6 22:41:28 2013
New Revision: 1520711

URL: http://svn.apache.org/r1520711
Log:
HIVE-5182 : log more stuff via PerfLogger (Sergey Shelukhin via Ashutosh Chauhan)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/Driver.java Fri Sep  6 22:41:28 2013
@@ -419,10 +419,13 @@ public class Driver implements CommandPr
       ctx.setCmd(command);
       ctx.setHDFSCleanup(true);
 
+      perfLogger.PerfLogBegin(LOG, PerfLogger.PARSE);
       ParseDriver pd = new ParseDriver();
       ASTNode tree = pd.parse(command, ctx);
       tree = ParseUtils.findRootNonNullToken(tree);
+      perfLogger.PerfLogEnd(LOG, PerfLogger.PARSE);
 
+      perfLogger.PerfLogBegin(LOG, PerfLogger.ANALYZE);
       BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
       List<HiveSemanticAnalyzerHook> saHooks =
           getHooks(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK,
@@ -448,6 +451,7 @@ public class Driver implements CommandPr
 
       // validate the plan
       sem.validate();
+      perfLogger.PerfLogEnd(LOG, PerfLogger.ANALYZE);
 
       plan = new QueryPlan(command, sem, perfLogger.getStartTime(PerfLogger.DRIVER_RUN));
 
@@ -1179,11 +1183,13 @@ public class Driver implements CommandPr
       }
 
       perfLogger.PerfLogEnd(LOG, PerfLogger.TIME_TO_SUBMIT);
+      perfLogger.PerfLogBegin(LOG, PerfLogger.RUN_TASKS);
       // Loop while you either have tasks running, or tasks queued up
       while (running.size() != 0 || runnable.peek() != null) {
         // Launch upto maxthreads tasks
         while (runnable.peek() != null && running.size() < maxthreads) {
           Task<? extends Serializable> tsk = runnable.remove();
+          perfLogger.PerfLogBegin(LOG, PerfLogger.TASK + tsk.getName() + "." + tsk.getId());
           launchTask(tsk, queryId, noName, running, jobname, jobs, driverCxt);
         }
 
@@ -1191,6 +1197,7 @@ public class Driver implements CommandPr
         TaskResult tskRes = pollTasks(running.keySet());
         TaskRunner tskRun = running.remove(tskRes);
         Task<? extends Serializable> tsk = tskRun.getTask();
+        perfLogger.PerfLogEnd(LOG, PerfLogger.TASK + tsk.getName() + "." + tsk.getId());
         hookContext.addCompleteTask(tskRun);
 
         int exitVal = tskRes.getExitVal();
@@ -1254,6 +1261,7 @@ public class Driver implements CommandPr
           }
         }
       }
+      perfLogger.PerfLogEnd(LOG, PerfLogger.RUN_TASKS);
 
       // in case we decided to run everything in local mode, restore the
       // the jobtracker setting to its initial value

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Sep  6 22:41:28
2013
@@ -112,6 +112,7 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -268,7 +269,7 @@ public final class Utilities {
           localPath = new Path(name);
         }
         InputStream in = new FileInputStream(localPath.toUri().getPath());
-        BaseWork ret = deserializeObject(in);
+        BaseWork ret = deserializePlan(in);
         gWork = ret;
         gWorkMap.put(path, gWork);
       }
@@ -479,7 +480,7 @@ public final class Utilities {
       // use the default file system of the conf
       FileSystem fs = planPath.getFileSystem(conf);
       FSDataOutputStream out = fs.create(planPath);
-      serializeObject(w, out);
+      serializePlan(w, out);
 
       // Serialize the plan to the default hdfs instance
       // Except for hadoop local mode execution where we should be
@@ -587,6 +588,47 @@ public final class Utilities {
   }
 
   /**
+   * Serializes the plan.
+   * @param plan The plan, such as QueryPlan, MapredWork, etc.
+   * @param out The stream to write to.
+   */
+  public static void serializePlan(Object plan, OutputStream out) {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.SERIALIZE_PLAN);
+    serializeObject(plan, out);
+    perfLogger.PerfLogEnd(LOG, PerfLogger.SERIALIZE_PLAN);
+  }
+
+  /**
+   * Deserializes the plan.
+   * @param in The stream to read from.
+   * @return The plan, such as QueryPlan, MapredWork, etc.
+   */
+  public static <T> T deserializePlan(InputStream in) {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.DESERIALIZE_PLAN);
+    T result = deserializeObject(in);
+    perfLogger.PerfLogEnd(LOG, PerfLogger.DESERIALIZE_PLAN);
+    return result;
+  }
+
+  /**
+   * Clones using the powers of XML. Do not use unless necessary.
+   * @param plan The plan.
+   * @return The clone.
+   */
+  public static <T> T clonePlan(T plan) {
+    // TODO: need proper clone. Meanwhiel, let's at least keep this horror in one place
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.CLONE_PLAN);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    Utilities.serializeObject(plan, baos);
+    T copy = Utilities.deserializeObject(new ByteArrayInputStream(baos.toByteArray()));
+    perfLogger.PerfLogEnd(LOG, PerfLogger.CLONE_PLAN);
+    return copy;
+  }
+
+  /**
    * Serialize the object. This helper function mainly makes sure that enums,
    * counters, etc are handled properly.
    */
@@ -1802,6 +1844,8 @@ public final class Utilities {
    */
   public static ContentSummary getInputSummary(Context ctx, MapWork work, PathFilter filter)
       throws IOException {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.INPUT_SUMMARY);
 
     long[] summary = {0, 0, 0};
 
@@ -1937,6 +1981,7 @@ public final class Utilities {
               + cs.getFileCount() + " directory count: " + cs.getDirectoryCount());
         }
 
+        perfLogger.PerfLogEnd(LOG, PerfLogger.INPUT_SUMMARY);
         return new ContentSummary(summary[0], summary[1], summary[2]);
       } finally {
         HiveInterruptUtils.remove(interrup);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Fri Sep  6 22:41:28
2013
@@ -716,12 +716,12 @@ public class ExecDriver extends Task<Map
     int ret;
     if (localtask) {
       memoryMXBean = ManagementFactory.getMemoryMXBean();
-      MapredLocalWork plan = (MapredLocalWork) Utilities.deserializeObject(pathData);
+      MapredLocalWork plan = (MapredLocalWork) Utilities.deserializePlan(pathData);
       MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
       ret = ed.executeFromChildJVM(new DriverContext());
 
     } else {
-      MapredWork plan = (MapredWork) Utilities.deserializeObject(pathData);
+      MapredWork plan = (MapredWork) Utilities.deserializePlan(pathData);
       ExecDriver ed = new ExecDriver(plan, conf, isSilent);
       ret = ed.execute(new DriverContext());
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Fri Sep  6 22:41:28
2013
@@ -181,7 +181,7 @@ public class MapRedTask extends ExecDriv
       OutputStream out = FileSystem.getLocal(conf).create(planPath);
       MapredWork plan = getWork();
       LOG.info("Generating plan file " + planPath.toString());
-      Utilities.serializeObject(plan, out);
+      Utilities.serializePlan(plan, out);
 
       String isSilent = "true".equalsIgnoreCase(System
           .getProperty("test.silent")) ? "-nolog" : "";

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Fri Sep
 6 22:41:28 2013
@@ -141,7 +141,7 @@ public class MapredLocalTask extends Tas
       OutputStream out = FileSystem.getLocal(conf).create(planPath);
       MapredLocalWork plan = getWork();
       LOG.info("Generating plan file " + planPath.toString());
-      Utilities.serializeObject(plan, out);
+      Utilities.serializePlan(plan, out);
 
       String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent")) ? "-nolog"
: "";
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Fri Sep
 6 22:41:28 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -262,6 +263,8 @@ public class CombineHiveInputFormat<K ex
    */
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.GET_SPLITS);
     init(job);
     Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
     Map<String, Operator<? extends OperatorDesc>> aliasToWork =
@@ -269,8 +272,11 @@ public class CombineHiveInputFormat<K ex
     CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
         .getCombineFileInputFormat();
 
+    InputSplit[] splits = null;
     if (combine == null) {
-      return super.getSplits(job, numSplits);
+      splits = super.getSplits(job, numSplits);
+      perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+      return splits;
     }
 
     if (combine.getInputPathsShim(job).length == 0) {
@@ -320,9 +326,10 @@ public class CombineHiveInputFormat<K ex
           // If path is a directory
           if (fStats.isDir()) {
             dirs.offer(path);
-          }
-          else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
-            return super.getSplits(job, numSplits);
+          } else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
+            splits = super.getSplits(job, numSplits);
+            perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+            return splits;
           }
 
           while (dirs.peek() != null) {
@@ -331,9 +338,11 @@ public class CombineHiveInputFormat<K ex
             for (int idx = 0; idx < fStatus.length; idx++) {
               if (fStatus[idx].isDir()) {
                 dirs.offer(fStatus[idx].getPath());
-              }
-              else if ((new CompressionCodecFactory(job)).getCodec(fStatus[idx].getPath())
!= null) {
-                return super.getSplits(job, numSplits);
+              } else if ((new CompressionCodecFactory(job)).getCodec(
+                  fStatus[idx].getPath()) != null) {
+                splits = super.getSplits(job, numSplits);
+                perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+                return splits;
               }
             }
           }
@@ -341,7 +350,9 @@ public class CombineHiveInputFormat<K ex
       }
 
       if (inputFormat instanceof SymlinkTextInputFormat) {
-        return super.getSplits(job, numSplits);
+        splits = super.getSplits(job, numSplits);
+        perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+        return splits;
       }
 
       Path filterPath = path;
@@ -411,10 +422,11 @@ public class CombineHiveInputFormat<K ex
     }
 
     LOG.info("number of splits " + result.size());
+    perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
     return result.toArray(new CombineHiveInputSplit[result.size()]);
   }
 
-    private void processPaths(JobConf job, CombineFileInputFormatShim combine,
+  private void processPaths(JobConf job, CombineFileInputFormatShim combine,
       List<InputSplitShim> iss, Path... path) throws IOException {
     JobConf currJob = new JobConf(job);
     FileInputFormat.setInputPaths(currJob, path);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Fri Sep  6 22:41:28
2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.io.HiveIOE
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -257,7 +258,8 @@ public class HiveInputFormat<K extends W
   }
 
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.GET_SPLITS);
     init(job);
 
     Path[] dirs = FileInputFormat.getInputPaths(job);
@@ -296,7 +298,7 @@ public class HiveInputFormat<K extends W
     }
 
     LOG.info("number of splits " + result.size());
-
+    perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
     return result.toArray(new HiveInputSplit[result.size()]);
   }
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java Fri Sep  6 22:41:28
2013
@@ -34,8 +34,17 @@ import org.apache.hadoop.hive.ql.session
 public class PerfLogger {
   public static final String ACQUIRE_READ_WRITE_LOCKS = "acquireReadWriteLocks";
   public static final String COMPILE = "compile";
+  public static final String PARSE = "parse";
+  public static final String ANALYZE = "semanticAnalyze";
   public static final String DO_AUTHORIZATION = "doAuthorization";
   public static final String DRIVER_EXECUTE = "Driver.execute";
+  public static final String INPUT_SUMMARY = "getInputSummary";
+  public static final String GET_SPLITS = "getSplits";
+  public static final String RUN_TASKS = "runTasks";
+  public static final String SERIALIZE_PLAN = "serializePlan";
+  public static final String DESERIALIZE_PLAN = "deserializePlan";
+  public static final String CLONE_PLAN = "clonePlan";
+  public static final String TASK = "task.";
   public static final String RELEASE_LOCKS = "releaseLocks";
   public static final String PRUNE_LISTING = "prune-listing";
   public static final String PARTITION_RETRIEVING = "partition-retrieving";

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
Fri Sep  6 22:41:28 2013
@@ -502,9 +502,7 @@ public class CommonJoinTaskDispatcher ex
         }
         // deep copy a new mapred work from xml
         // Once HIVE-4396 is in, it would be faster to use a cheaper method to clone the
plan
-        String xml = currTask.getWork().toXML();
-        InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
-        MapredWork newWork = Utilities.deserializeObject(in);
+        MapredWork newWork = Utilities.clonePlan(currTask.getWork());
 
         // create map join task and set big table as i
         ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(newWork,
i);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java
Fri Sep  6 22:41:28 2013
@@ -241,16 +241,7 @@ public final class GenMRSkewJoinProcesso
           HiveConf.ConfVars.HIVE_MAPPER_CANNOT_SPAN_MULTIPLE_PARTITIONS);
       newPlan.setMapperCannotSpanPartns(mapperCannotSpanPartns);
 
-      MapredWork clonePlan = null;
-      try {
-        String xmlPlan = currPlan.toXML();
-        StringBuilder sb = new StringBuilder(xmlPlan);
-        ByteArrayInputStream bis;
-        bis = new ByteArrayInputStream(sb.toString().getBytes("UTF-8"));
-        clonePlan = Utilities.deserializeObject(bis);
-      } catch (UnsupportedEncodingException e) {
-        throw new SemanticException(e);
-      }
+      MapredWork clonePlan = Utilities.clonePlan(currPlan);
 
       Operator<? extends OperatorDesc>[] parentOps = new TableScanOperator[tags.length];
       for (int k = 0; k < tags.length; k++) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
Fri Sep  6 22:41:28 2013
@@ -148,11 +148,8 @@ public class SortMergeJoinTaskDispatcher
   private MapredWork convertSMBWorkToJoinWork(MapredWork currWork, SMBMapJoinOperator oldSMBJoinOp)
       throws SemanticException {
     try {
-      String xml = currWork.toXML();
-
       // deep copy a new mapred work
-      InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
-      MapredWork currJoinWork = Utilities.deserializeObject(in);
+      MapredWork currJoinWork = Utilities.clonePlan(currWork);
       SMBMapJoinOperator newSMBJoinOp = getSMBMapJoinOp(currJoinWork);
 
       // Add the row resolver for the new operator
@@ -169,14 +166,13 @@ public class SortMergeJoinTaskDispatcher
   }
 
   // create map join task and set big table as bigTablePosition
-  private ObjectPair<MapRedTask, String> convertSMBTaskToMapJoinTask(String xml,
+  private ObjectPair<MapRedTask, String> convertSMBTaskToMapJoinTask(MapredWork origWork,
       int bigTablePosition,
       SMBMapJoinOperator smbJoinOp,
       QBJoinTree joinTree)
       throws UnsupportedEncodingException, SemanticException {
-    // deep copy a new mapred work from xml
-    InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
-    MapredWork newWork = Utilities.deserializeObject(in);
+    // deep copy a new mapred work
+    MapredWork newWork = Utilities.clonePlan(origWork);
     // create a mapred task for this work
     MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
         .getParseContext().getConf());
@@ -290,7 +286,6 @@ public class SortMergeJoinTaskDispatcher
       long aliasTotalKnownInputSize = getTotalKnownInputSize(context, currJoinWork.getMapWork(),
           pathToAliases, aliasToSize);
 
-      String xml = currJoinWork.toXML();
       long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(conf,
           HiveConf.ConfVars.HIVESMALLTABLESFILESIZE);
 
@@ -301,8 +296,8 @@ public class SortMergeJoinTaskDispatcher
         }
 
         // create map join task for the given big table position
-        ObjectPair<MapRedTask, String> newTaskAlias =
-            convertSMBTaskToMapJoinTask(xml, bigTablePosition, newSMBJoinOp, joinTree);
+        ObjectPair<MapRedTask, String> newTaskAlias = convertSMBTaskToMapJoinTask(
+            currJoinWork, bigTablePosition, newSMBJoinOp, joinTree);
         MapRedTask newTask = newTaskAlias.getFirst();
         String bigTableAlias = newTaskAlias.getSecond();
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java?rev=1520711&r1=1520710&r2=1520711&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredWork.java Fri Sep  6 22:41:28
2013
@@ -82,10 +82,4 @@ public class MapredWork extends Abstract
 
     return ops;
   }
-
-  public String toXML() {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    Utilities.serializeObject(this, baos);
-    return (baos.toString());
-  }
 }



Mime
View raw message