hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r934241 [1/16] - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ metastore/src/java/org/apache/hadoop/hive/metastore/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql...
Date Wed, 14 Apr 2010 23:36:09 GMT
Author: namit
Date: Wed Apr 14 23:36:07 2010
New Revision: 934241

URL: http://svn.apache.org/viewvc?rev=934241&view=rev
Log:
HIVE-1002. multi-partition inserts
(Ning Zhang via namit)


Added:
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobCloseFeedBack.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java
    hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part1.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part1.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part10.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part11.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part12.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part13.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part2.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part3.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part4.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part5.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part6.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part7.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part8.q
    hadoop/hive/trunk/ql/src/test/queries/clientpositive/load_dyn_part9.q
    hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part1.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part10.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part11.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part12.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part13.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part2.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part3.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part4.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part5.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part6.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part7.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part8.q.out
    hadoop/hive/trunk/ql/src/test/results/clientpositive/load_dyn_part9.q.out
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/SubStructObjectInspector.java
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
    hadoop/hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspector.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorUtils.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Wed Apr 14 23:36:07 2010
@@ -75,6 +75,9 @@ Trunk -  Unreleased
     HIVE-1295. facilitate HBase bulk loads from Hive
     (John Sichi via namit)
 
+    HIVE-1002. multi-partition inserts
+    (Ning Zhang via namit)
+
   IMPROVEMENTS
     HIVE-983. Function from_unixtime takes long.
     (Ning Zhang via zshao)

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/common/FileUtils.java Wed Apr 14 23:36:07 2010
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.common;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.BitSet;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -33,11 +35,11 @@ public final class FileUtils {
   /**
    * Variant of Path.makeQualified that qualifies the input path against the
    * default file system indicated by the configuration
-   * 
+   *
    * This does not require a FileSystem handle in most cases - only requires the
    * Filesystem URI. This saves the cost of opening the Filesystem - which can
    * involve RPCs - as well as cause errors
-   * 
+   *
    * @param path
    *          path to be fully qualified
    * @param conf
@@ -80,4 +82,89 @@ public final class FileUtils {
   private FileUtils() {
     // prevent instantiation
   }
+
+
+  public static String makePartName(List<String> partCols,
+      List<String> vals) {
+
+    StringBuilder name = new StringBuilder();
+    for (int i = 0; i < partCols.size(); i++) {
+      if (i > 0) {
+        name.append(Path.SEPARATOR);
+      }
+      name.append(escapePathName((partCols.get(i)).toLowerCase()));
+      name.append('=');
+      name.append(escapePathName(vals.get(i)));
+    }
+    return name.toString();
+  }
+
+  // NOTE: This is for generating the internal path name for partitions. Users
+  // should always use the MetaStore API to get the path name for a partition.
+  // Users should not directly take partition values and turn it into a path
+  // name by themselves, because the logic below may change in the future.
+  //
+  // In the future, it's OK to add new chars to the escape list, and old data
+  // won't be corrupt, because the full path name in metastore is stored.
+  // In that case, Hive will continue to read the old data, but when it creates
+  // new partitions, it will use new names.
+  static BitSet charToEscape = new BitSet(128);
+  static {
+    for (char c = 0; c < ' '; c++) {
+      charToEscape.set(c);
+    }
+    char[] clist = new char[] { '"', '#', '%', '\'', '*', '/', ':', '=', '?',
+        '\\', '\u007F' };
+    for (char c : clist) {
+      charToEscape.set(c);
+    }
+  }
+
+  static boolean needsEscaping(char c) {
+    return c >= 0 && c < charToEscape.size() && charToEscape.get(c);
+  }
+
+  public static String escapePathName(String path) {
+
+    // __HIVE_DEFAULT_NULL__ is the system default value for null and empty string. We should
+    // TODO: we should allow user to specify default partition or HDFS file location.
+    if (path == null || path.length() == 0) {
+      return "__HIVE_DEFAULT_PARTITION__";
+    }
+
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < path.length(); i++) {
+      char c = path.charAt(i);
+      if (needsEscaping(c)) {
+        sb.append('%');
+        sb.append(String.format("%1$02X", (int) c));
+      } else {
+        sb.append(c);
+      }
+    }
+    return sb.toString();
+  }
+
+  public static String unescapePathName(String path) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < path.length(); i++) {
+      char c = path.charAt(i);
+      if (c == '%' && i + 2 < path.length()) {
+        int code = -1;
+        try {
+          code = Integer.valueOf(path.substring(i + 1, i + 3), 16);
+        } catch (Exception e) {
+          code = -1;
+        }
+        if (code >= 0) {
+          sb.append((char) code);
+          i += 2;
+          continue;
+        }
+      }
+      sb.append(c);
+    }
+    return sb.toString();
+  }
+
 }

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Apr 14 23:36:07 2010
@@ -84,6 +84,11 @@ public class HiveConf extends Configurat
     EXECPARALLEL("hive.exec.parallel", false), // parallel query launching
     EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8),
     HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true),
+    DYNAMICPARTITIONING("hive.exec.dynamic.partition", false),
+    DYNAMICPARTITIONINGMODE("hive.exec.dynamic.partition.mode", "strict"),
+    DYNAMICPARTITIONMAXPARTS("hive.exec.max.dynamic.partitions", 1000),
+    DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100),
+    DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"),
 
     // hadoop stuff
     HADOOPBIN("hadoop.bin.path", System.getenv("HADOOP_HOME") + "/bin/hadoop"),

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Wed Apr 14 23:36:07 2010
@@ -521,4 +521,34 @@
   <description>Remove extra map-reduce jobs if the data is already clustered by the same key which needs to be used again. This should always be set to true. Since it is a new feature, it has been made configurable.</description>
 </property>
 
+<property>
+  <name>hive.exec.dynamic.partition</name>
+  <value>false</value>
+  <description>Whether or not to allow dynamic partitions in DML/DDL.</description>
+</property>
+
+<property>
+  <name>hive.exec.dynamic.partition.mode</name>
+  <value>strict</value>
+  <description>In strict mode, the user must specify at least one static partition in case the user accidentally overwrites all partitions.</description>
+</property>
+
+<property>
+  <name>hive.exec.max.dynamic.partitions</name>
+  <value>1000</value>
+  <description>Maximum number of dynamic partitions allowed to be created in total.</description>
+</property>
+
+<property>
+  <name>hive.exec.max.dynamic.partitions.pernode</name>
+  <value>100</value>
+  <description>Maximum number of dynamic partitions allowed to be created in each mapper/reducer node.</description>
+</property>
+
+<property>
+  <name>hive.default.partition.name</name>
+  <value>__HIVE_DEFAULT_PARTITION__</value>
+  <description>The default partition name in case the dynamic partition column value is null/empty string or anyother values that cannot be escaped. This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). The user has to be aware that the dynamic partition value should not contain this value to avoid confusions.</description>
+</property>
+
 </configuration>

Modified: hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java (original)
+++ hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java Wed Apr 14 23:36:07 2010
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.metastore
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -37,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -77,10 +77,10 @@ public class Warehouse {
    * System URI always contains the canonical DNS name of the Namenode.
    * Subsequently, operations on paths with raw ip addresses cause an exception
    * since they don't match the file system URI.
-   * 
+   *
    * This routine solves this problem by replacing the scheme and authority of a
    * path with the scheme and authority of the FileSystem that it maps to.
-   * 
+   *
    * @param path
    *          Path to be canonicalized
    * @return Path with canonical scheme and authority
@@ -162,6 +162,7 @@ public class Warehouse {
     return false;
   }
 
+  /*
   // NOTE: This is for generating the internal path name for partitions. Users
   // should always use the MetaStore API to get the path name for a partition.
   // Users should not directly take partition values and turn it into a path
@@ -186,43 +187,23 @@ public class Warehouse {
   static boolean needsEscaping(char c) {
     return c >= 0 && c < charToEscape.size() && charToEscape.get(c);
   }
+  */
 
   static String escapePathName(String path) {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < path.length(); i++) {
-      char c = path.charAt(i);
-      if (needsEscaping(c)) {
-        sb.append('%');
-        sb.append(String.format("%1$02X", (int) c));
-      } else {
-        sb.append(c);
-      }
-    }
-    return sb.toString();
+    return FileUtils.escapePathName(path);
   }
 
   static String unescapePathName(String path) {
-    StringBuilder sb = new StringBuilder();
-    for (int i = 0; i < path.length(); i++) {
-      char c = path.charAt(i);
-      if (c == '%' && i + 2 < path.length()) {
-        int code = -1;
-        try {
-          code = Integer.valueOf(path.substring(i + 1, i + 3), 16);
-        } catch (Exception e) {
-          code = -1;
-        }
-        if (code >= 0) {
-          sb.append((char) code);
-          i += 2;
-          continue;
-        }
-      }
-      sb.append(c);
-    }
-    return sb.toString();
+    return FileUtils.unescapePathName(path);
   }
 
+  /**
+   * Given a partition specification, return the path corresponding to the
+   * partition spec. By default, the specification does not include dynamic partitions.
+   * @param spec
+   * @return string representation of the partition specification.
+   * @throws MetaException
+   */
   public static String makePartName(Map<String, String> spec)
       throws MetaException {
     StringBuilder suffixBuf = new StringBuilder();
@@ -238,43 +219,60 @@ public class Warehouse {
     return suffixBuf.toString();
   }
 
+  /**
+   * Given a dynamic partition specification, return the path corresponding to the
+   * static part of partition specification. This is basically a copy of makePartName
+   * but we get rid of MetaException since it is not serializable.
+   * @param spec
+   * @return string representation of the static part of the partition specification.
+   */
+  public static String makeDynamicPartName(Map<String, String> spec) {
+    StringBuilder suffixBuf = new StringBuilder();
+    for (Entry<String, String> e : spec.entrySet()) {
+      if (e.getValue() != null && e.getValue().length() > 0) {
+        suffixBuf.append(escapePathName(e.getKey()));
+        suffixBuf.append('=');
+        suffixBuf.append(escapePathName(e.getValue()));
+        suffixBuf.append(Path.SEPARATOR);
+      } else { // stop once we see a dynamic partition
+        break;
+      }
+    }
+    return suffixBuf.toString();
+  }
+
   static final Pattern pat = Pattern.compile("([^/]+)=([^/]+)");
 
   public static LinkedHashMap<String, String> makeSpecFromName(String name)
       throws MetaException {
-    LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
     if (name == null || name.isEmpty()) {
       throw new MetaException("Partition name is invalid. " + name);
     }
+    LinkedHashMap<String, String> partSpec = new LinkedHashMap<String, String>();
+    makeSpecFromName(partSpec, new Path(name));
+    return partSpec;
+  }
+
+  public static void makeSpecFromName(Map<String, String> partSpec, Path currPath) {
     List<String[]> kvs = new ArrayList<String[]>();
-    Path currPath = new Path(name);
     do {
       String component = currPath.getName();
       Matcher m = pat.matcher(component);
       if (m.matches()) {
         String k = unescapePathName(m.group(1));
         String v = unescapePathName(m.group(2));
-
-        if (partSpec.containsKey(k)) {
-          throw new MetaException("Partition name is invalid. Key " + k
-              + " defined at two levels");
-        }
         String[] kv = new String[2];
         kv[0] = k;
         kv[1] = v;
         kvs.add(kv);
-      } else {
-        throw new MetaException("Partition name is invalid. " + name);
       }
       currPath = currPath.getParent();
     } while (currPath != null && !currPath.getName().isEmpty());
 
-    // reverse the list since we checked the part from leaf dir to table's base
-    // dir
+    // reverse the list since we checked the part from leaf dir to table's base dir
     for (int i = kvs.size(); i > 0; i--) {
       partSpec.put(kvs.get(i - 1)[0], kvs.get(i - 1)[1]);
     }
-    return partSpec;
   }
 
   public Path getPartitionPath(String dbName, String tableName,
@@ -307,16 +305,10 @@ public class Warehouse {
     if ((partCols.size() != vals.size()) || (partCols.size() == 0)) {
       throw new MetaException("Invalid partition key & values");
     }
-    StringBuilder name = new StringBuilder();
-    for (int i = 0; i < partCols.size(); i++) {
-      if (i > 0) {
-        name.append(Path.SEPARATOR);
-      }
-      name.append(escapePathName((partCols.get(i)).getName().toLowerCase()));
-      name.append('=');
-      name.append(escapePathName(vals.get(i)));
+    List<String> colNames = new ArrayList<String>();
+    for (FieldSchema col: partCols) {
+      colNames.add(col.getName());
     }
-    return name.toString();
+    return FileUtils.makePartName(colNames, vals);
   }
-
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractMapJoinOperator.java Wed Apr 14 23:36:07 2010
@@ -20,30 +20,20 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator.MapJoinObjectCtx;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectKey;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
 import org.apache.hadoop.hive.ql.exec.persistence.RowContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.util.ReflectionUtils;
 
 
 public abstract class AbstractMapJoinOperator <T extends MapJoinDesc> extends CommonJoinOperator<T> implements
@@ -67,7 +57,7 @@ public abstract class AbstractMapJoinOpe
   transient int mapJoinRowsKey; // rows for a given key
 
   protected transient RowContainer<ArrayList<Object>> emptyList = null;
-  
+
   transient int numMapRowsRead;
 
   private static final transient String[] FATAL_ERR_MSG = {
@@ -78,7 +68,7 @@ public abstract class AbstractMapJoinOpe
 
   transient boolean firstRow;
   transient int heartbeatInterval;
-  
+
   public AbstractMapJoinOperator() {
   }
 
@@ -138,7 +128,7 @@ public abstract class AbstractMapJoinOpe
     errMsg.append("Operator " + getOperatorId() + " (id=" + id + "): "
         + FATAL_ERR_MSG[(int) counterCode]);
   }
-  
+
   protected void reportProgress() {
     // Send some status periodically
     numMapRowsRead++;

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/CommonJoinOperator.java Wed Apr 14 23:36:07 2010
@@ -24,8 +24,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,8 +44,8 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -142,7 +142,7 @@ public abstract class CommonJoinOperator
 
   public CommonJoinOperator() {
   }
-  
+
   public CommonJoinOperator(CommonJoinOperator<T> clone) {
     this.joinEmitInterval = clone.joinEmitInterval;
     this.joinCacheSize = clone.joinCacheSize;
@@ -155,9 +155,9 @@ public abstract class CommonJoinOperator
     this.operatorId = clone.operatorId;
     this.storage = clone.storage;
     this.condn = clone.condn;
-    
+
     this.setSchema(clone.getSchema());
-    
+
     this.alias = clone.alias;
     this.beginTime = clone.beginTime;
     this.inputRows = clone.inputRows;
@@ -793,7 +793,7 @@ public abstract class CommonJoinOperator
 
   /**
    * Forward a record of join results.
-   * 
+   *
    * @throws HiveException
    */
   @Override
@@ -880,7 +880,7 @@ public abstract class CommonJoinOperator
 
   /**
    * All done.
-   * 
+   *
    */
   @Override
   public void closeOp(boolean abort) throws HiveException {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Wed Apr 14 23:36:07 2010
@@ -721,16 +721,27 @@ public class ExecDriver extends Task<Map
       }
     }
 
+    // get the list of Dynamic partition paths
+    ArrayList<String> dpPaths = new ArrayList<String>();
     try {
       if (rj != null) {
+        JobCloseFeedBack feedBack = new JobCloseFeedBack();
         if (work.getAliasToWork() != null) {
           for (Operator<? extends Serializable> op : work.getAliasToWork()
               .values()) {
-            op.jobClose(job, success);
+            op.jobClose(job, success, feedBack);
+            ArrayList<Object> dirs = feedBack.get(JobCloseFeedBack.FeedBackType.DYNAMIC_PARTITIONS);
+            if (dirs != null) {
+              for (Object o: dirs) {
+                if (o instanceof String) {
+                  dpPaths.add((String)o);
+                }
+              }
+            }
           }
         }
         if (work.getReducer() != null) {
-          work.getReducer().jobClose(job, success);
+          work.getReducer().jobClose(job, success, feedBack);
         }
       }
     } catch (Exception e) {

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExplainTask.java Wed Apr 14 23:36:07 2010
@@ -28,9 +28,9 @@ import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.plan.Explain;
@@ -40,7 +40,7 @@ import org.apache.hadoop.util.StringUtil
 
 /**
  * ExplainTask implementation.
- * 
+ *
  **/
 public class ExplainTask extends Task<ExplainWork> implements Serializable {
   private static final long serialVersionUID = 1L;
@@ -111,6 +111,8 @@ public class ExplainTask extends Task<Ex
       } else if (ent.getValue() instanceof Serializable) {
         out.println();
         outputPlan((Serializable) ent.getValue(), out, extended, indent + 2);
+      } else {
+        out.println();
       }
     }
   }
@@ -156,7 +158,7 @@ public class ExplainTask extends Task<Ex
       return true;
     }
 
-    if (val.getClass().isPrimitive()) {
+    if (val != null && val.getClass().isPrimitive()) {
       return true;
     }
 
@@ -370,7 +372,7 @@ public class ExplainTask extends Task<Ex
       List<Task<? extends Serializable>> rootTasks, int indent) throws Exception {
     out.print(indentString(indent));
     out.println("STAGE PLANS:");
-    HashSet<Task<? extends Serializable>> displayedSet = 
+    HashSet<Task<? extends Serializable>> displayedSet =
       new HashSet<Task<? extends Serializable>>();
     for (Task<? extends Serializable> rootTask : rootTasks) {
       outputPlan(rootTask, out, work.getExtended(), displayedSet, indent + 2);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Wed Apr 14 23:36:07 2010
@@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -29,19 +31,28 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack.FeedBackType;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePartitioner;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -55,6 +66,25 @@ import org.apache.hadoop.util.Reflection
 public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     Serializable {
 
+  protected transient HashMap<String, FSPaths> valToPaths;
+  protected transient int numDynParts;
+  protected transient List<String> dpColNames;
+  protected transient DynamicPartitionCtx dpCtx;
+  protected transient boolean isCompressed;
+  protected transient Path parent;
+  protected transient HiveOutputFormat<?, ?> hiveOutputFormat;
+  protected transient Path specPath;
+  protected transient int dpStartCol; // start column # for DP columns
+  protected transient List<String> dpVals; // array of values corresponding to DP columns
+  protected transient List<Object> dpWritables;
+  protected transient RecordWriter[] rowOutWriters;      // row specific RecordWriters
+  protected transient int maxPartitions;
+
+  private static final transient String[] FATAL_ERR_MSG = {
+    null, // counter value 0 means no error
+    "Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions.pernode."
+  };
+
   /**
    * RecordWriter.
    *
@@ -65,6 +95,112 @@ public class FileSinkOperator extends Te
     void close(boolean abort) throws IOException;
   }
 
+  public class FSPaths implements Cloneable {
+    Path tmpPath;
+    Path[] outPaths;
+    Path[] finalPaths;
+    RecordWriter[] outWriters;
+
+    public FSPaths() {
+    }
+
+    public FSPaths(Path specPath) {
+      tmpPath = Utilities.toTempPath(specPath);
+      outPaths   = new Path[numFiles];
+      finalPaths = new Path[numFiles];
+      outWriters = new RecordWriter[numFiles];
+    }
+
+    /**
+     * Append a subdirectory to the tmp path.
+     * @param dp subdirecgtory name
+     */
+    public void appendTmpPath(String dp) {
+      tmpPath = new Path(tmpPath, dp);
+    }
+
+    /**
+     * Update OutPath according to tmpPath.
+     */
+    public Path getOutPath(String taskId) {
+      return getOutPath(taskId, this.tmpPath);
+    }
+
+    /**
+     * Update OutPath according to tmpPath.
+     */
+    public Path getOutPath(String taskId, Path tmp) {
+      return new Path(tmp, Utilities.toTempPath(taskId));
+    }
+
+    /**
+     * Update the final paths according to tmpPath.
+     */
+    public Path getFinalPath(String taskId) {
+      return getFinalPath(taskId, this.tmpPath, null);
+    }
+
+    /**
+     * Update the final paths according to tmpPath.
+     */
+    public Path getFinalPath(String taskId, Path tmpPath, String extension) {
+      if (extension != null) {
+        return new Path(tmpPath, taskId + extension);
+      } else {
+        return new Path(tmpPath, taskId);
+      }
+    }
+
+    public void setOutWriters(RecordWriter[] out) {
+      outWriters = out;
+    }
+
+    public RecordWriter[] getOutWriters() {
+      return outWriters;
+    }
+
+    public void closeWriters(boolean abort) throws HiveException {
+      for (int idx = 0; idx < outWriters.length; idx++) {
+        if (outWriters[idx] != null) {
+          try {
+            outWriters[idx].close(abort);
+          } catch (IOException e) {
+            throw new HiveException(e);
+          }
+        }
+      }
+    }
+
+    private void commit(FileSystem fs) throws HiveException {
+      for (int idx = 0; idx < outPaths.length; ++idx) {
+        try {
+          if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+            throw new HiveException("Unable to rename output to: "
+                + finalPaths[idx]);
+          }
+        } catch (IOException e) {
+          throw new HiveException(e + "Unable to rename output to: "
+              + finalPaths[idx]);
+        }
+      }
+    }
+
+    public void abortWriters(FileSystem fs, boolean abort, boolean delete) throws HiveException {
+      for (int idx = 0; idx < outWriters.length; idx++) {
+        if (outWriters[idx] != null) {
+          try {
+            outWriters[idx].close(abort);
+          	if (delete) {
+          	  fs.delete(outPaths[idx], true);
+          	}
+          } catch (IOException e) {
+            throw new HiveException(e);
+          }
+        }
+      }
+    }
+  } // class FSPaths
+
   private static final long serialVersionUID = 1L;
   protected transient FileSystem fs;
   protected transient Serializer serializer;
@@ -82,14 +218,15 @@ public class FileSinkOperator extends Te
   private transient int      totalFiles;
   private transient int      numFiles;
   private transient boolean  multiFileSpray;
-  private transient Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+  private transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
 
-  private transient RecordWriter[] outWriters;
-  private transient Path[] outPaths;
-  private transient Path[] finalPaths;
   private transient ObjectInspector[] partitionObjectInspectors;
   private transient HivePartitioner<HiveKey, Object> prtner;
-  private transient HiveKey key = new HiveKey();
+  private transient final HiveKey key = new HiveKey();
+  private transient Configuration hconf;
+  private transient FSPaths fsp;
+  private transient boolean bDynParts;
+  private transient SubStructObjectInspector subSetOI;
 
   /**
    * TableIdEnum.
@@ -114,37 +251,128 @@ public class FileSinkOperator extends Te
   }
 
   protected transient boolean autoDelete = false;
+  protected transient JobConf jc;
+  Class<? extends Writable> outputClass;
+  /*
+  String specPath;
+  Path tmpPath;
+  */
+  String taskId;
+
+  private boolean filesCreated = false;
+ @Override
+  protected void initializeOp(Configuration hconf) throws HiveException {
+    try {
+      this.hconf       = hconf;
+      filesCreated     = false;
+      isNativeTable    = !conf.getTableInfo().isNonNative();
+      multiFileSpray   = conf.isMultiFileSpray();
+      totalFiles       = conf.getTotalFiles();
+      numFiles         = conf.getNumFiles();
+      dpCtx            = conf.getDynPartCtx();
+      valToPaths       = new HashMap<String, FSPaths>();
+      taskId           = Utilities.getTaskId(hconf);
+      specPath         = new Path(conf.getDirName());
+      fs               = specPath.getFileSystem(hconf);
+      hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+      isCompressed     = conf.getCompressed();
+      parent           = Utilities.toTempPath(conf.getDirName());
+      serializer       = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
+      serializer.initialize(null, conf.getTableInfo().getProperties());
+      outputClass = serializer.getSerializedClass();
+
+      if (hconf instanceof JobConf) {
+        jc = (JobConf) hconf;
+      } else {
+        // test code path
+        jc = new JobConf(hconf, ExecDriver.class);
+      }
+
+      if (multiFileSpray) {
+        partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
+        int i = 0;
+        for (ExprNodeDesc e : conf.getPartitionCols()) {
+          partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
+        }
 
-  private void commit(int idx) throws IOException {
-    if (isNativeTable) {
-      if (!fs.rename(outPaths[idx], finalPaths[idx])) {
-        throw new IOException("Unable to rename output to: "
-          + finalPaths[idx]);
+        partitionObjectInspectors = initEvaluators(partitionEval, outputObjInspector);
+        prtner = (HivePartitioner<HiveKey, Object>) ReflectionUtils.newInstance(
+            jc.getPartitionerClass(), null);
+      }
+      int id = conf.getDestTableId();
+      if ((id != 0) && (id <= TableIdEnum.values().length)) {
+        String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT";
+        tabIdEnum = TableIdEnum.valueOf(enumName);
+        row_count = new LongWritable();
+        statsMap.put(tabIdEnum, row_count);
       }
+
+      if (dpCtx != null) {
+        dpSetup();
+      }
+
+      if (!bDynParts) {
+        fsp = new FSPaths(specPath);
+
+      	// Create all the files - this is required because empty files need to be created for
+      	// empty buckets
+      	createBucketFiles(fsp);
+      	valToPaths.put("", fsp); // special entry for non-DP case
+      }
+
+      initializeChildren(hconf);
+    } catch (HiveException e) {
+      throw e;
+    } catch (Exception e) {
+      e.printStackTrace();
+      throw new HiveException(e);
     }
-    LOG.info("Committed " + outPaths[idx] + " to output file: " + finalPaths[idx]);
   }
 
-  private boolean filesCreated = false;
-  private void openFiles(Configuration hconf) throws HiveException {
-    if (filesCreated)
-      return;
-    try {
-      String specPath = conf.getDirName();
-      Path tmpPath = Utilities.toTempPath(specPath);
-      Set<Integer> seenBuckets = new HashSet<Integer>();
-      fs = (new Path(specPath)).getFileSystem(hconf);
-      HiveOutputFormat<?, ?> hiveOutputFormat = conf.getTableInfo()
-        .getOutputFileFormatClass().newInstance();
-      boolean isCompressed = conf.getCompressed();
-      Path parent = Utilities.toTempPath(specPath);
-      final Class<? extends Writable> outputClass = serializer.getSerializedClass();
+  /**
+   * Set up for dynamic partitioning including a new ObjectInspector for the output row.
+   */
+  private void dpSetup() {
+
+    this.bDynParts      = false;
+    this.numDynParts    = dpCtx.getNumDPCols();
+    this.dpColNames     = dpCtx.getDPColNames();
+    this.maxPartitions  = dpCtx.getMaxPartitionsPerNode();
+
+    assert numDynParts == dpColNames.size():
+      "number of dynamic paritions should be the same as the size of DP mapping";
+
+    if (dpColNames != null && dpColNames.size() > 0) {
+      this.bDynParts = true;
+      assert inputObjInspectors.length == 1: "FileSinkOperator should have 1 parent, but it has "
+        + inputObjInspectors.length;
+      StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[0];
+      // remove the last dpMapping.size() columns from the OI
+      List<? extends StructField> fieldOI = soi.getAllStructFieldRefs();
+      ArrayList<ObjectInspector> newFieldsOI = new ArrayList<ObjectInspector>();
+      ArrayList<String> newFieldsName = new ArrayList<String>();
+      this.dpStartCol = 0;
+      for (StructField sf: fieldOI) {
+        String fn = sf.getFieldName();
+        if (!dpCtx.getInputToDPCols().containsKey(fn)) {
+          newFieldsOI.add(sf.getFieldObjectInspector());
+          newFieldsName.add(sf.getFieldName());
+          this.dpStartCol++;
+        }
+      }
+      assert newFieldsOI.size() > 0: "new Fields ObjectInspector is empty";
+
+      this.subSetOI = new SubStructObjectInspector(soi, 0, this.dpStartCol);
+      this.dpVals = new ArrayList<String>(numDynParts);
+      this.dpWritables = new ArrayList<Object>(numDynParts);
+    }
+  }
 
-      // Create all the files - this is required because empty files need to be created for empty buckets
+  private void createBucketFiles(FSPaths fsp) throws HiveException {
+    try {
       int filesIdx = 0;
+      Set<Integer> seenBuckets = new HashSet<Integer>();
       for (int idx = 0; idx < totalFiles; idx++) {
-        String taskId = Utilities.getTaskId(hconf);
-
         if (this.getExecContext() != null && this.getExecContext().getFileId() != -1) {
           LOG.info("replace taskId from execContext ");
 
@@ -166,49 +394,72 @@ public class FileSinkOperator extends Te
             int currReducer = Integer.valueOf(Utilities.getTaskIdFromFilename(Utilities.getTaskId(hconf)));
 
             int reducerIdx = prtner.getPartition(key, null, numReducers);
-            if (currReducer != reducerIdx)
+            if (currReducer != reducerIdx) {
               continue;
+            }
           }
 
           int bucketNum = prtner.getBucket(key, null, totalFiles);
-          if (seenBuckets.contains(bucketNum))
+          if (seenBuckets.contains(bucketNum)) {
             continue;
+          }
           seenBuckets.add(bucketNum);
 
           bucketMap.put(bucketNum, filesIdx);
           taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
         }
-
         if (isNativeTable) {
-          finalPaths[filesIdx] = new Path(tmpPath, taskId);
-          LOG.info("Final Path: FS " + finalPaths[filesIdx]);
-          outPaths[filesIdx] = new Path(tmpPath, Utilities.toTempPath(taskId));
-          LOG.info("Writing to temp file: FS " + outPaths[filesIdx]);
+          fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId);
+          LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
+          fsp.outPaths[filesIdx] = fsp.getOutPath(taskId);
+          LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
         } else {
-          finalPaths[filesIdx] = outPaths[filesIdx] = new Path(specPath);
+          fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath;
         }
+        try {
+          // The reason to keep these instead of using
+        	// OutputFormat.getRecordWriter() is that
+        	// getRecordWriter does not give us enough control over the file name that
+        	// we create.
+          if (!bDynParts) {
+            fsp.finalPaths[filesIdx] = HiveFileFormatUtils.getOutputFormatFinalPath(
+                parent, taskId, jc, hiveOutputFormat, isCompressed, fsp.finalPaths[filesIdx]);
+          } else {
+            String extension = null;
+            if (hiveOutputFormat instanceof HiveIgnoreKeyTextOutputFormat) {
+              extension = Utilities.getFileExtension(jc, isCompressed);
+            }
+            fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension);
+          }
 
-        // The reason to keep these instead of using
-        // OutputFormat.getRecordWriter() is that
-        // getRecordWriter does not give us enough control over the file name that
-        // we create.
-        finalPaths[filesIdx] = HiveFileFormatUtils.getOutputFormatFinalPath(parent, taskId, jc,
-                                                                            hiveOutputFormat, isCompressed, finalPaths[filesIdx]);
-        LOG.info("New Final Path: FS " + finalPaths[filesIdx]);
+        } catch (Exception e) {
+          e.printStackTrace();
+          throw new HiveException(e);
+        }
+        LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
 
-        Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
-        outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf
-                                                                       .getTableInfo(), outputClass, conf, outPaths[filesIdx]);
+      	if (isNativeTable) {
+      	  try {
+      	    // in recent hadoop versions, use deleteOnExit to clean tmp files.
+      	    autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(
+      	        fs, fsp.outPaths[filesIdx]);
+      	  } catch (IOException e) {
+      	    throw new HiveException(e);
+      	  }
+      	}
 
+        Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
+        // only create bucket files only if no dynamic partitions,
+        // buckets of dynamic partitions will be created for each newly created partition
+        fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
+              jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx]);
         filesIdx++;
       }
-
       assert filesIdx == numFiles;
 
       // in recent hadoop versions, use deleteOnExit to clean tmp files.
       if (isNativeTable) {
-        autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs,
-                                                                        outPaths[0]);
+        autoDelete = ShimLoader.getHadoopShims().fileSystemDeleteOnExit(fs, fsp.outPaths[0]);
       }
     } catch (HiveException e) {
       throw e;
@@ -220,68 +471,13 @@ public class FileSinkOperator extends Te
     filesCreated = true;
   }
 
-  private Configuration hconf;
-  private JobConf jc;
-
-  @Override
-  protected void initializeOp(Configuration hconf) throws HiveException {
-    try {
-      filesCreated = false;
-      this.hconf = hconf;
-      serializer = (Serializer) conf.getTableInfo().getDeserializerClass()
-          .newInstance();
-      serializer.initialize(null, conf.getTableInfo().getProperties());
-      isNativeTable = !conf.getTableInfo().isNonNative();
-
-      if (hconf instanceof JobConf) {
-        jc = (JobConf) hconf;
-      } else {
-        // test code path
-        jc = new JobConf(hconf, ExecDriver.class);
-      }
-
-      multiFileSpray = conf.isMultiFileSpray();
-      totalFiles = conf.getTotalFiles();
-      numFiles   = conf.getNumFiles();
-
-      if (multiFileSpray) {
-        partitionEval = new ExprNodeEvaluator[conf.getPartitionCols().size()];
-        int i = 0;
-        for (ExprNodeDesc e : conf.getPartitionCols()) {
-          partitionEval[i++] = ExprNodeEvaluatorFactory.get(e);
-        }
-
-        partitionObjectInspectors = initEvaluators(partitionEval, outputObjInspector);
-        prtner = (HivePartitioner<HiveKey, Object>)ReflectionUtils.newInstance(jc.getPartitionerClass(), null);
-      }
-
-      outWriters = new RecordWriter[numFiles];
-      outPaths   = new Path[numFiles];
-      finalPaths = new Path[numFiles];
-
-      int id = conf.getDestTableId();
-      if ((id != 0) && (id <= TableIdEnum.values().length)) {
-        String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT";
-        tabIdEnum = TableIdEnum.valueOf(enumName);
-        row_count = new LongWritable();
-        statsMap.put(tabIdEnum, row_count);
-      }
-
-      initializeChildren(hconf);
-    } catch (HiveException e) {
-      throw e;
-    } catch (Exception e) {
-      e.printStackTrace();
-      throw new HiveException(e);
-    }
-  }
-
   Writable recordValue;
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
-    if (!filesCreated)
-      openFiles(hconf);
+    if (!bDynParts && !filesCreated) {
+      createBucketFiles(fsp);
+    }
 
     // Since File Sink is a terminal operator, forward is not called - so,
     // maintain the number of output rows explicitly
@@ -297,14 +493,41 @@ public class FileSinkOperator extends Te
       if (reporter != null) {
         reporter.progress();
       }
-      // user SerDe to serialize r, and write it out
-      recordValue = serializer.serialize(row, inputObjInspectors[tag]);
+
+      // if DP is enabled, get the final output writers and prepare the real output row
+      assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT:
+        "input object inspector is not struct";
+
+      if (bDynParts) {
+        // copy the DP column values from the input row to dpVals
+        dpVals.clear();
+        dpWritables.clear();
+        ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol, numDynParts,
+            (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
+        // get a set of RecordWriter based on the DP column values
+        // pass the null value along to the escaping process to determine what the dir should be
+        for (Object o: dpWritables) {
+          if (o == null || o.toString().length() == 0) {
+            dpVals.add(dpCtx.getDefaultPartitionName());
+          } else {
+            dpVals.add(o.toString());
+          }
+        }
+        rowOutWriters = getDynOutWriters(dpVals);
+        // use SubStructObjectInspector to serialize the non-partitioning columns in the input row
+        recordValue = serializer.serialize(row, subSetOI);
+      } else {
+        rowOutWriters = fsp.outWriters;
+        // use SerDe to serialize r, and write it out
+        recordValue = serializer.serialize(row, inputObjInspectors[0]);
+      }
+
       if (row_count != null) {
         row_count.set(row_count.get() + 1);
       }
 
       if (!multiFileSpray) {
-        outWriters[0].write(recordValue);
+        rowOutWriters[0].write(recordValue);
       }
       else {
         int keyHashCode = 0;
@@ -316,7 +539,7 @@ public class FileSinkOperator extends Te
         key.setHashCode(keyHashCode);
         int bucketNum = prtner.getBucket(key, null, totalFiles);
         int idx = bucketMap.get(bucketNum);
-        outWriters[bucketMap.get(bucketNum)].write(recordValue);
+        rowOutWriters[idx].write(recordValue);
       }
     } catch (IOException e) {
       throw new HiveException(e);
@@ -325,35 +548,70 @@ public class FileSinkOperator extends Te
     }
   }
 
+  private RecordWriter[] getDynOutWriters(List<String> row) throws HiveException {
+
+    RecordWriter[] rw;  // return value
+
+    // get the path corresponding to the dynamic partition columns,
+    String dpDir = getDynPartDirectory(row, dpColNames, numDynParts);
+
+    if (dpDir != null) {
+      FSPaths fsp2 = valToPaths.get(dpDir);
+      if (fsp2 == null) {
+        // check # of dp
+        if (valToPaths.size() > maxPartitions) {
+          // throw fatal error
+          incrCounter(fatalErrorCntr, 1);
+          fatalError = true;
+          LOG.error("Fatal error was thrown due to exceeding number of dynamic partitions");
+        }
+        fsp2 = new FSPaths(specPath);
+        fsp2.tmpPath = new Path(fsp2.tmpPath, dpDir);
+        createBucketFiles(fsp2);
+        valToPaths.put(dpDir, fsp2);
+      }
+      rw = fsp2.outWriters;
+    } else {
+      rw = fsp.outWriters;
+    }
+    return rw;
+  }
+
+  // given the current input row, the mapping for input col info to dp columns, and # of dp cols,
+  // return the relative path corresponding to the row.
+  private String getDynPartDirectory(List<String> row, List<String> dpColNames, int numDynParts) {
+    assert row.size() == numDynParts && numDynParts == dpColNames.size():
+      "data length is different from num of DP columns";
+    return FileUtils.makePartName(dpColNames, row);
+  }
+
+  @Override
+  protected void fatalErrorMessage(StringBuilder errMsg, long counterCode) {
+    errMsg.append("Operator ").append(getOperatorId()).append(" (id=").append(id).append("): ");
+    errMsg.append(counterCode > FATAL_ERR_MSG.length - 1 ?
+        "fatal error":
+        FATAL_ERR_MSG[(int) counterCode]);
+  }
+
   @Override
   public void closeOp(boolean abort) throws HiveException {
-    if (!filesCreated)
-      openFiles(hconf);
+    if (!bDynParts && !filesCreated) {
+      createBucketFiles(fsp);
+    }
 
     if (!abort) {
-      for (int idx = 0; idx < numFiles; idx++) {
-        if (outWriters[idx] != null) {
-          try {
-            outWriters[idx].close(abort);
-            commit(idx);
-          } catch (IOException e) {
-            throw new HiveException(e);
-          }
+      for (FSPaths fsp: valToPaths.values()) {
+        fsp.closeWriters(abort);
+        if (isNativeTable) {
+          fsp.commit(fs);
         }
       }
     } else {
       // Will come here if an Exception was thrown in map() or reduce().
       // Hadoop always call close() even if an Exception was thrown in map() or
       // reduce().
-      try {
-        for (int idx = 0; idx < numFiles; idx++) {
-          outWriters[idx].close(abort);
-          if (!autoDelete && isNativeTable) {
-            fs.delete(outPaths[idx], true);
-          }
-        }
-      } catch (Exception e) {
-        e.printStackTrace();
+      for (FSPaths fsp: valToPaths.values()) {
+        fsp.abortWriters(fs, abort, !autoDelete && isNativeTable);
       }
     }
   }
@@ -367,20 +625,25 @@ public class FileSinkOperator extends Te
   }
 
   @Override
-  public void jobClose(Configuration hconf, boolean success) throws HiveException {
+  public void jobClose(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
+      throws HiveException {
     try {
       if ((conf != null) && isNativeTable) {
         String specPath = conf.getDirName();
-        FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
+        DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
+        mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx);
+        // send back the root path of corresponding to the DP columns
+        feedBack.add(FeedBackType.DYNAMIC_PARTITIONS, specPath);
       }
     } catch (IOException e) {
       throw new HiveException(e);
     }
-    super.jobClose(hconf, success);
+    super.jobClose(hconf, success, feedBack);
   }
 
-  public static void mvFileToFinalPath(String specPath, Configuration hconf,
-      boolean success, Log log) throws IOException, HiveException {
+  public void mvFileToFinalPath(String specPath, Configuration hconf,
+      boolean success, Log log, DynamicPartitionCtx dpCtx) throws IOException, HiveException {
+
     FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
     Path tmpPath = Utilities.toTempPath(specPath);
     Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
@@ -394,7 +657,13 @@ public class FileSinkOperator extends Te
         log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
         Utilities.rename(fs, tmpPath, intermediatePath);
         // Step2: remove any tmp file or double-committed output files
-        Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
+        ArrayList<String> emptyBuckets =
+          Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
+        // create empty buckets if necessary
+        if (emptyBuckets.size() > 0) {
+          createEmptyBuckets(hconf, emptyBuckets);
+        }
+
         // Step3: move to the file destination
         log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
         Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
@@ -404,6 +673,51 @@ public class FileSinkOperator extends Te
     }
   }
 
+  /**
+   * Check the existence of buckets according to bucket specification. Create empty buckets if
+   * needed.
+   * @param specPath The final path where the dynamic partitions should be in.
+   * @param conf FileSinkDesc.
+   * @param dpCtx dynamic partition context.
+   * @throws HiveException
+   * @throws IOException
+   */
+  private void createEmptyBuckets(Configuration hconf, ArrayList<String> paths)
+      throws HiveException, IOException {
+
+    JobConf jc;
+    if (hconf instanceof JobConf) {
+      jc = new JobConf(hconf);
+    } else {
+      // test code path
+      jc = new JobConf(hconf, ExecDriver.class);
+    }
+    HiveOutputFormat<?, ?> hiveOutputFormat = null;
+    Class<? extends Writable> outputClass = null;
+    boolean isCompressed = conf.getCompressed();
+    TableDesc tableInfo = conf.getTableInfo();
+    try {
+      Serializer serializer = (Serializer) tableInfo.getDeserializerClass().newInstance();
+    	serializer.initialize(null, tableInfo.getProperties());
+    	outputClass = serializer.getSerializedClass();
+    	hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+    } catch (SerDeException e) {
+      throw new HiveException(e);
+    } catch (InstantiationException e) {
+      throw new HiveException(e);
+    } catch (IllegalAccessException e) {
+      throw new HiveException(e);
+    }
+
+    for (String p: paths) {
+      Path path = new Path(p);
+      RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
+          jc, hiveOutputFormat, outputClass, isCompressed, tableInfo.getProperties(), path);
+      writer.close(false);
+      LOG.info("created empty bucket for enforcing bucketing at " + path);
+    }
+  }
+
   @Override
   public int getType() {
     return OperatorType.FILESINK;

Added: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobCloseFeedBack.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobCloseFeedBack.java?rev=934241&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobCloseFeedBack.java (added)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JobCloseFeedBack.java Wed Apr 14 23:36:07 2010
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+
+/**
+ * Task implementation.
+ **/
+
+public class JobCloseFeedBack {
+  public static enum FeedBackType {
+    DYNAMIC_PARTITIONS,
+    NONE
+  };
+
+  HashMap<FeedBackType, ArrayList<Object>> feedBacks; // one type corresponds to a list of values
+
+  public JobCloseFeedBack() {
+    feedBacks = new HashMap<FeedBackType, ArrayList<Object>>();
+  }
+
+  public void add(FeedBackType t, Object v) {
+    ArrayList<Object> vals = feedBacks.get(t);
+    if (vals == null) {
+      vals = new ArrayList<Object>();
+    }
+    vals.add(v);
+    feedBacks.put(t, vals);
+  }
+
+  public ArrayList<Object> get(FeedBackType t) {
+    return feedBacks.get(t);
+  }
+}
\ No newline at end of file

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Wed Apr 14 23:36:07 2010
@@ -130,7 +130,7 @@ public class JoinOperator extends Common
 
   /**
    * All done.
-   * 
+   *
    */
   @Override
   public void closeOp(boolean abort) throws HiveException {
@@ -141,20 +141,21 @@ public class JoinOperator extends Common
   }
 
   @Override
-  public void jobClose(Configuration hconf, boolean success) throws HiveException {
+  public void jobClose(Configuration hconf, boolean success, JobCloseFeedBack feedBack)
+      throws HiveException {
     int numAliases = conf.getExprs().size();
     if (conf.getHandleSkewJoin()) {
       try {
         for (int i = 0; i < numAliases; i++) {
           String specPath = conf.getBigKeysDirMap().get((byte) i);
-          FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
+          mvFileToFinalPath(specPath, hconf, success, LOG);
           for (int j = 0; j < numAliases; j++) {
             if (j == i) {
               continue;
             }
             specPath = getConf().getSmallKeysDirMap().get((byte) i).get(
                 (byte) j);
-            FileSinkOperator.mvFileToFinalPath(specPath, hconf, success, LOG);
+            mvFileToFinalPath(specPath, hconf, success, LOG);
           }
         }
 
@@ -177,7 +178,7 @@ public class JoinOperator extends Common
         throw new HiveException(e);
       }
     }
-    super.jobClose(hconf, success);
+    super.jobClose(hconf, success, feedBack);
   }
 
   private void moveUpFiles(String specPath, Configuration hconf, Log log)
@@ -197,8 +198,45 @@ public class JoinOperator extends Common
   }
 
   /**
+   * This is a similar implementation of FileSinkOperator.moveFileToFinalPath.
+   * @param specPath
+   * @param hconf
+   * @param success
+   * @param log
+   * @param dpCtx
+   * @throws IOException
+   * @throws HiveException
+   */
+  private void  mvFileToFinalPath(String specPath, Configuration hconf,
+      boolean success, Log log) throws IOException, HiveException {
+
+    FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
+    Path tmpPath = Utilities.toTempPath(specPath);
+    Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
+        + ".intermediate");
+    Path finalPath = new Path(specPath);
+    ArrayList<String> emptyBuckets = null;
+    if (success) {
+      if (fs.exists(tmpPath)) {
+        // Step1: rename tmp output folder to intermediate path. After this
+        // point, updates from speculative tasks still writing to tmpPath
+        // will not appear in finalPath.
+        log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
+        Utilities.rename(fs, tmpPath, intermediatePath);
+        // Step2: remove any tmp file or double-committed output files
+        Utilities.removeTempOrDuplicateFiles(fs, intermediatePath);
+        // Step3: move to the file destination
+        log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
+        Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
+      }
+    } else {
+      fs.delete(tmpPath, true);
+    }
+  }
+
+  /**
    * Forward a record of join results.
-   * 
+   *
    * @throws HiveException
    */
   @Override

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Wed Apr 14 23:36:07 2010
@@ -23,6 +23,9 @@ import java.io.Serializable;
 import java.security.AccessControlException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -30,12 +33,13 @@ import org.apache.hadoop.fs.LocalFileSys
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
+import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.LoadFileDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
@@ -113,12 +117,19 @@ public class MoveTask extends Task<MoveW
       // Next we do this for tables and partitions
       LoadTableDesc tbd = work.getLoadTableWork();
       if (tbd != null) {
-        String mesg = "Loading data to table "
-            + tbd.getTable().getTableName()
-            + ((tbd.getPartitionSpec().size() > 0) ? " partition "
-            + tbd.getPartitionSpec().toString() : "");
+        StringBuilder mesg = new StringBuilder("Loading data to table ")
+            .append( tbd.getTable().getTableName());
+        if (tbd.getPartitionSpec().size() > 0) {
+          mesg.append(" partition (");
+          Map<String, String> partSpec = tbd.getPartitionSpec();
+          for (String key: partSpec.keySet()) {
+            mesg.append(key).append('=').append(partSpec.get(key)).append(", ");
+          }
+          mesg.setLength(mesg.length()-2);
+          mesg.append(')');
+        }
         String mesg_detail = " from " + tbd.getSourceDir();
-        console.printInfo(mesg, mesg_detail);
+        console.printInfo(mesg.toString(), mesg_detail);
         Table table = db.getTable(MetaStoreUtils.DEFAULT_DATABASE_NAME, tbd
             .getTable().getTableName());
 
@@ -145,7 +156,8 @@ public class MoveTask extends Task<MoveW
           }
           if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVECHECKFILEFORMAT)) {
             // Check if the file format of the file matches that of the table.
-            boolean flag = HiveFileFormatUtils.checkInputFormat(fs, conf, tbd.getTable().getInputFileFormatClass(), files);
+            boolean flag = HiveFileFormatUtils.checkInputFormat(
+                fs, conf, tbd.getTable().getInputFileFormatClass(), files);
             if (!flag) {
               throw new HiveException(
                   "Wrong file format. Please check the file's format.");
@@ -164,20 +176,62 @@ public class MoveTask extends Task<MoveW
           }
         } else {
           LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
-          db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable()
-              .getTableName(), tbd.getPartitionSpec(), tbd.getReplace(),
-              new Path(tbd.getTmpDir()));
-          Partition partn = db.getPartition(table, tbd.getPartitionSpec(),
-              false);
-          dc = new DataContainer(table.getTTable(), partn.getTPartition());
-          if (work.getOutputs() != null) {
-            work.getOutputs().add(new WriteEntity(partn));
-          }
-        }
+          // deal with dynamic partitions
+          DynamicPartitionCtx dpCtx = tbd.getDPCtx();
+          if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
+            // load the list of DP partitions and return the list of partition specs
+            ArrayList<LinkedHashMap<String, String>> dp =
+              db.loadDynamicPartitions(
+                  new Path(tbd.getSourceDir()),
+                  tbd.getTable().getTableName(),
+                	tbd.getPartitionSpec(),
+                	tbd.getReplace(),
+                	new Path(tbd.getTmpDir()),
+                	dpCtx.getNumDPCols());
+            // for each partition spec, get the partition
+            // and put it to WriteEntity for post-exec hook
+            for (LinkedHashMap<String, String> partSpec: dp) {
+              Partition partn = db.getPartition(table, partSpec, false);
+
+              WriteEntity enty = new WriteEntity(partn);
+              if (work.getOutputs() != null) {
+                work.getOutputs().add(enty);
+              }
+              // Need to update the queryPlan's output as well so that post-exec hook get executed.
+              // This is only needed for dynamic partitioning since for SP the the WriteEntity is
+              // constructed at compile time and the queryPlan already contains that.
+              // For DP, WriteEntity creation is deferred at this stage so we need to update
+              // queryPlan here.
+              if (queryPlan.getOutputs() == null) {
+                queryPlan.setOutputs(new HashSet<WriteEntity>());
+              }
+              queryPlan.getOutputs().add(enty);
 
-        if (SessionState.get() != null) {
-          SessionState.get().getLineageState()
-            .setLineage(tbd.getSourceDir(), dc, table.getCols());
+              // update columnar lineage for each partition
+              dc = new DataContainer(table.getTTable(), partn.getTPartition());
+
+              if (SessionState.get() != null) {
+                SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
+                    table.getCols());
+              }
+
+              console.printInfo("\tLoading partition " + partSpec);
+            }
+            dc = null; // reset data container to prevent it being added again.
+          } else { // static partitions
+            db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
+                tbd.getPartitionSpec(), tbd.getReplace(), new Path(tbd.getTmpDir()));
+          	Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+          	dc = new DataContainer(table.getTTable(), partn.getTPartition());
+          	// add this partition to post-execution hook
+          	if (work.getOutputs() != null) {
+          	  work.getOutputs().add(new WriteEntity(partn));
+          	}
+         }
+        }
+        if (SessionState.get() != null && dc != null) {
+          SessionState.get().getLineageState().setLineage(tbd.getSourceDir(), dc,
+              table.getCols());
         }
       }
 

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=934241&r1=934240&r2=934241&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Wed Apr 14 23:36:07 2010
@@ -68,7 +68,7 @@ public abstract class Operator<T extends
    * run-time while extracting the operator specific counts.
    */
   protected HashMap<String, ProgressCounter> counterNameToEnum;
-  
+
   private transient ExecMapperContext execContext;
 
   private static int seqId;
@@ -111,7 +111,7 @@ public abstract class Operator<T extends
 
   /**
    * Create an operator with a reporter.
-   * 
+   *
    * @param reporter
    *          Used to report progress of certain operators.
    */
@@ -272,7 +272,7 @@ public abstract class Operator<T extends
 
   /**
    * checks whether all parent operators are initialized or not.
-   * 
+   *
    * @return true if there are no parents or all parents are initialized. false
    *         otherwise
    */
@@ -287,11 +287,11 @@ public abstract class Operator<T extends
     }
     return true;
   }
-  
+
   /**
    * Initializes operators only if all parents have been initialized. Calls
    * operator specific initializer which then initializes child ops.
-   * 
+   *
    * @param hconf
    * @param inputOIs
    *          input object inspector array indexes by tag id. null value is
@@ -343,14 +343,14 @@ public abstract class Operator<T extends
     }
     // derived classes can set this to different object if needed
     outputObjInspector = inputObjInspectors[0];
-    
+
     //pass the exec context to child operators
     passExecContext(this.execContext);
-    
+
     initializeOp(hconf);
     LOG.info("Initialization Done " + id + " " + getName());
   }
-  
+
   public void initializeLocalWork(Configuration hconf) throws HiveException {
     if (childOperators != null) {
       for (int i =0; i<childOperators.size();i++) {
@@ -359,7 +359,7 @@ public abstract class Operator<T extends
       }
     }
   }
-  
+
   /**
    * Operator specific initialization.
    */
@@ -386,9 +386,9 @@ public abstract class Operator<T extends
       }
     }
   }
-  
+
   /**
-   * Pass the execContext reference to every child operator 
+   * Pass the execContext reference to every child operator
    */
   public void passExecContext(ExecMapperContext execContext) {
     this.setExecContext(execContext);
@@ -402,7 +402,7 @@ public abstract class Operator<T extends
   /**
    * Collects all the parent's output object inspectors and calls actual
    * initialization method.
-   * 
+   *
    * @param hconf
    * @param inputOI
    *          OI of the row that this parent will pass to this op
@@ -425,10 +425,10 @@ public abstract class Operator<T extends
     // call the actual operator initialization function
     initialize(hconf, null);
   }
-  
+
   /**
    * Process the row.
-   * 
+   *
    * @param row
    *          The object representing the row.
    * @param tag
@@ -440,7 +440,7 @@ public abstract class Operator<T extends
 
   /**
    * Process the row.
-   * 
+   *
    * @param row
    *          The object representing the row.
    * @param tag
@@ -565,20 +565,20 @@ public abstract class Operator<T extends
   /**
    * Unlike other operator interfaces which are called from map or reduce task,
    * jobClose is called from the jobclient side once the job has completed.
-   * 
+   *
    * @param conf
    *          Configuration with with which job was submitted
    * @param success
    *          whether the job was completed successfully or not
    */
-  public void jobClose(Configuration conf, boolean success)
+  public void jobClose(Configuration conf, boolean success, JobCloseFeedBack feedBack)
       throws HiveException {
     if (childOperators == null) {
       return;
     }
 
     for (Operator<? extends Serializable> op : childOperators) {
-      op.jobClose(conf, success);
+      op.jobClose(conf, success, feedBack);
     }
   }
 
@@ -596,7 +596,7 @@ public abstract class Operator<T extends
   /**
    * Replace one child with another at the same position. The parent of the
    * child is not changed
-   * 
+   *
    * @param child
    *          the old child
    * @param newChild
@@ -630,7 +630,7 @@ public abstract class Operator<T extends
   /**
    * Replace one parent with another at the same position. Chilren of the new
    * parent are not updated
-   * 
+   *
    * @param parent
    *          the old parent
    * @param newParent
@@ -734,7 +734,7 @@ public abstract class Operator<T extends
 
   /**
    * Implements the getName function for the Node Interface.
-   * 
+   *
    * @return the name of the operator
    */
   public String getName() {
@@ -744,7 +744,7 @@ public abstract class Operator<T extends
   /**
    * Returns a map of output column name to input expression map Note that
    * currently it returns only key columns for ReduceSink and GroupBy operators.
-   * 
+   *
    * @return null if the operator doesn't change columns
    */
   public Map<String, ExprNodeDesc> getColumnExprMap() {
@@ -933,7 +933,7 @@ public abstract class Operator<T extends
 
   /**
    * this is called in operators in map or reduce tasks.
-   * 
+   *
    * @param name
    * @param amount
    */
@@ -978,7 +978,7 @@ public abstract class Operator<T extends
 
   /**
    * called in ExecDriver.progress periodically.
-   * 
+   *
    * @param ctrs
    *          counters from the running job
    */
@@ -1011,7 +1011,7 @@ public abstract class Operator<T extends
   /**
    * Recursively check this operator and its descendants to see if the fatal
    * error counter is set to non-zero.
-   * 
+   *
    * @param ctrs
    */
   public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
@@ -1049,7 +1049,7 @@ public abstract class Operator<T extends
 
   /**
    * Get the fatal error message based on counter's code.
-   * 
+   *
    * @param errMsg
    *          error message should be appended to this output parameter.
    * @param counterValue
@@ -1129,7 +1129,7 @@ public abstract class Operator<T extends
   /**
    * Should be overridden to return the type of the specific operator among the
    * types in OperatorType.
-   * 
+   *
    * @return OperatorType.* or -1 if not overridden
    */
   public int getType() {



Mime
View raw message