hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From khorg...@apache.org
Subject svn commit: r1568008 - in /hive/trunk/hcatalog/core/src: main/java/org/apache/hive/hcatalog/common/ main/java/org/apache/hive/hcatalog/mapreduce/ test/java/org/apache/hive/hcatalog/mapreduce/
Date Thu, 13 Feb 2014 19:01:15 GMT
Author: khorgath
Date: Thu Feb 13 19:01:15 2014
New Revision: 1568008

URL: http://svn.apache.org/r1568008
Log:
HIVE-6109 : Support customized location for EXTERNAL tables created by Dynamic Partitioning
(Satish Mittal via Sushanth Sowmyan)

Added:
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java
Modified:
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
    hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
    hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
    hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
    hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
(original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
Thu Feb 13 19:01:15 2014
@@ -121,6 +121,7 @@ public final class HCatConstants {
 
   public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid";
   public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false;
+  public static final String HCAT_DYNAMIC_CUSTOM_PATTERN = "hcat.dynamic.partitioning.custom.pattern";
 
   // Message Bus related properties.
   public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat";

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
(original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
Thu Feb 13 19:01:15 2014
@@ -69,10 +69,13 @@ class FileOutputCommitterContainer exten
 
   private static final String TEMP_DIR_NAME = "_temporary";
   private static final String LOGS_DIR_NAME = "_logs";
+  /** The directory under which data is initially written for a partitioned table */
+  static final String DYNTEMP_DIR_NAME = "_DYN";
 
   private static final Logger LOG = LoggerFactory.getLogger(FileOutputCommitterContainer.class);
   private final boolean dynamicPartitioningUsed;
   private boolean partitionsDiscovered;
+  private final boolean customDynamicLocationUsed;
 
   private Map<String, Map<String, String>> partitionsDiscoveredByPath;
   private Map<String, JobContext> contextDiscoveredByPath;
@@ -97,6 +100,14 @@ class FileOutputCommitterContainer exten
 
     this.partitionsDiscovered = !dynamicPartitioningUsed;
     cachedStorageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
+    Table table = new Table(jobInfo.getTableInfo().getTable());
+    if (dynamicPartitioningUsed && Boolean.valueOf((String)table.getProperty("EXTERNAL"))
+        && jobInfo.getCustomDynamicPath() != null
+        && jobInfo.getCustomDynamicPath().length() > 0) {
+      customDynamicLocationUsed = true;
+    } else {
+      customDynamicLocationUsed = false;
+    }
   }
 
   @Override
@@ -164,8 +175,12 @@ class FileOutputCommitterContainer exten
       Path src;
       OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
       if (dynamicPartitioningUsed) {
-        src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable()
-            .getPartitionKeysSize()));
+        if (!customDynamicLocationUsed) {
+          src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable()
+              .getPartitionKeysSize()));
+        } else {
+          src = new Path(getCustomPartitionRootLocation(jobInfo, jobContext.getConfiguration()));
+        }
       } else {
         src = new Path(jobInfo.getLocation());
       }
@@ -235,7 +250,26 @@ class FileOutputCommitterContainer exten
     throw new IOException("The method cleanupJob is deprecated and should not be called.");
   }
 
+  private String getCustomPartitionRootLocation(OutputJobInfo jobInfo, Configuration conf)
{
+    if (ptnRootLocation == null) {
+      // we only need to calculate it once, it'll be the same for other partitions in this
job.
+      String parentPath = jobInfo.getTableInfo().getTableLocation();
+      if (jobInfo.getCustomDynamicRoot() != null
+          && jobInfo.getCustomDynamicRoot().length() > 0) {
+        parentPath = new Path(parentPath, jobInfo.getCustomDynamicRoot()).toString();
+      }
+      Path ptnRoot = new Path(parentPath, DYNTEMP_DIR_NAME +
+          conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
+      ptnRootLocation = ptnRoot.toString();
+    }
+    return ptnRootLocation;
+  }
+
   private String getPartitionRootLocation(String ptnLocn, int numPtnKeys) {
+    if (customDynamicLocationUsed) {
+      return null;
+    }
+
     if (ptnRootLocation == null) {
       // we only need to calculate it once, it'll be the same for other partitions in this
job.
       Path ptnRoot = new Path(ptnLocn);
@@ -255,6 +289,7 @@ class FileOutputCommitterContainer exten
    * @param jobInfo The OutputJobInfo.
    * @param partLocnRoot The table-equivalent location root of the partition
    *                       (temporary dir if dynamic partition, table dir if static)
+   * @param dynPartPath The path of dynamic partition which is created
    * @param partKVs The keyvalue pairs that form the partition
    * @param outputSchema The output schema for the partition
    * @param params The parameters to store inside the partition
@@ -268,7 +303,7 @@ class FileOutputCommitterContainer exten
 
   private Partition constructPartition(
     JobContext context, OutputJobInfo jobInfo,
-    String partLocnRoot, Map<String, String> partKVs,
+    String partLocnRoot, String dynPartPath, Map<String, String> partKVs,
     HCatSchema outputSchema, Map<String, String> params,
     Table table, FileSystem fs,
     String grpName, FsPermission perms) throws IOException {
@@ -292,7 +327,10 @@ class FileOutputCommitterContainer exten
     // Sets permissions and group name on partition dirs and files.
 
     Path partPath;
-    if (Boolean.valueOf((String)table.getProperty("EXTERNAL"))
+    if (customDynamicLocationUsed) {
+      partPath = new Path(dynPartPath);
+    } else if (!dynamicPartitioningUsed
+         && Boolean.valueOf((String)table.getProperty("EXTERNAL"))
          && jobInfo.getLocation() != null && jobInfo.getLocation().length()
> 0) {
       // honor external table that specifies the location
       partPath = new Path(jobInfo.getLocation());
@@ -315,7 +353,7 @@ class FileOutputCommitterContainer exten
 
     // Set the location in the StorageDescriptor
     if (dynamicPartitioningUsed) {
-      String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table, partKVs);
+      String dynamicPartitionDestination = getFinalDynamicPartitionDestination(table, partKVs,
jobInfo);
       if (harProcessor.isEnabled()) {
         harProcessor.exec(context, partition, partPath);
         partition.getSd().setLocation(
@@ -344,14 +382,25 @@ class FileOutputCommitterContainer exten
     }
   }
 
-  private String getFinalDynamicPartitionDestination(Table table, Map<String, String>
partKVs) {
-    // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA
 ->
-    // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA
+  private String getFinalDynamicPartitionDestination(Table table, Map<String, String>
partKVs,
+      OutputJobInfo jobInfo) {
     Path partPath = new Path(table.getTTable().getSd().getLocation());
-    for (FieldSchema partKey : table.getPartitionKeys()) {
-      partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
+    if (!customDynamicLocationUsed) {
+      // file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA
 ->
+      // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA
+      for (FieldSchema partKey : table.getPartitionKeys()) {
+        partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs);
+      }
+
+      return partPath.toString();
+    } else {
+      // if custom root specified, update the parent path
+      if (jobInfo.getCustomDynamicRoot() != null
+          && jobInfo.getCustomDynamicRoot().length() > 0) {
+        partPath = new Path(partPath, jobInfo.getCustomDynamicRoot());
+      }
+      return new Path(partPath, HCatFileUtil.resolveCustomPath(jobInfo, partKVs, false)).toString();
     }
-    return partPath.toString();
   }
 
   private Map<String, String> getStorerParameterMap(StorerInfo storer) {
@@ -480,8 +529,11 @@ class FileOutputCommitterContainer exten
             if (LOG.isDebugEnabled()) {
               LOG.debug("Moving directory: " + file + " to " + parentDir);
             }
-            if (!fs.rename(file, parentDir)) {
-              final String msg = "Failed to move file: " + file + " to " + parentDir;
+
+            // If custom dynamic location provided, need to rename to final output path
+            Path dstPath = !customDynamicLocationUsed ? parentDir : finalOutputPath;
+            if (!fs.rename(file, dstPath)) {
+              final String msg = "Failed to move file: " + file + " to " + dstPath;
               LOG.error(msg);
               throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
             }
@@ -576,7 +628,12 @@ class FileOutputCommitterContainer exten
 
         for (FileStatus st : status) {
           LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String,
String>();
-          Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
+          if (!customDynamicLocationUsed) {
+            Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
+          } else {
+            HCatFileUtil.getPartKeyValuesForCustomLocation(fullPartSpec, jobInfo,
+                st.getPath().toString());
+          }
           partitionsDiscoveredByPath.put(st.getPath().toString(), fullPartSpec);
           JobConf jobConf = (JobConf)context.getConfiguration();
           JobContext currContext = HCatMapRedUtil.createJobContext(
@@ -636,7 +693,7 @@ class FileOutputCommitterContainer exten
         partitionsToAdd.add(
             constructPartition(
                 context,jobInfo,
-                tblPath.toString(), jobInfo.getPartitionValues()
+                tblPath.toString(), null, jobInfo.getPartitionValues()
                 ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
                 ,table, fs
                 ,grpName,perms));
@@ -645,7 +702,8 @@ class FileOutputCommitterContainer exten
           partitionsToAdd.add(
               constructPartition(
                   context,jobInfo,
-                  getPartitionRootLocation(entry.getKey(),entry.getValue().size()), entry.getValue()
+                  getPartitionRootLocation(entry.getKey(),entry.getValue().size())
+                  ,entry.getKey(), entry.getValue()
                   ,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
                   ,table, fs
                   ,grpName,perms));
@@ -659,13 +717,16 @@ class FileOutputCommitterContainer exten
 
       //Publish the new partition(s)
       if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){
-
-        Path src = new Path(ptnRootLocation);
-        // check here for each dir we're copying out, to see if it
-        // already exists, error out if so
-        moveTaskOutputs(fs, src, src, tblPath, true);
-        moveTaskOutputs(fs, src, src, tblPath, false);
-        fs.delete(src, true);
+        if (!customDynamicLocationUsed) {
+          Path src = new Path(ptnRootLocation);
+          // check here for each dir we're copying out, to see if it
+          // already exists, error out if so
+          moveTaskOutputs(fs, src, src, tblPath, true);
+          moveTaskOutputs(fs, src, src, tblPath, false);
+          fs.delete(src, true);
+        } else {
+          moveCustomLocationTaskOutputs(fs, table, hiveConf);
+        }
         try {
           updateTableSchema(client, table, jobInfo.getOutputSchema());
           LOG.info("HAR is being used. The table {} has new partitions {}.", table.getTableName(),
ptnInfos);
@@ -687,10 +748,14 @@ class FileOutputCommitterContainer exten
         updateTableSchema(client, table, jobInfo.getOutputSchema());
         LOG.info("HAR not is not being used. The table {} has new partitions {}.", table.getTableName(),
ptnInfos);
         if (dynamicPartitioningUsed && (partitionsToAdd.size()>0)){
-          Path src = new Path(ptnRootLocation);
-          moveTaskOutputs(fs, src, src, tblPath, true);
-          moveTaskOutputs(fs, src, src, tblPath, false);
-          fs.delete(src, true);
+          if (!customDynamicLocationUsed) {
+            Path src = new Path(ptnRootLocation);
+            moveTaskOutputs(fs, src, src, tblPath, true);
+            moveTaskOutputs(fs, src, src, tblPath, false);
+            fs.delete(src, true);
+          } else {
+            moveCustomLocationTaskOutputs(fs, table, hiveConf);
+          }
         }
         client.add_partitions(partitionsToAdd);
         partitionsAdded = partitionsToAdd;
@@ -720,6 +785,24 @@ class FileOutputCommitterContainer exten
     }
   }
 
+  private void moveCustomLocationTaskOutputs(FileSystem fs, Table table, Configuration conf)
+    throws IOException {
+    // in case of custom dynamic partitions, we can't just move the sub-tree of partition
root
+    // directory since the partitions location contain regex pattern. We need to first find
the
+    // final destination of each partition and move its output.
+    for (Entry<String, Map<String, String>> entry : partitionsDiscoveredByPath.entrySet())
{
+      Path src = new Path(entry.getKey());
+      Path destPath = new Path(getFinalDynamicPartitionDestination(table, entry.getValue(),
jobInfo));
+      moveTaskOutputs(fs, src, src, destPath, true);
+      moveTaskOutputs(fs, src, src, destPath, false);
+    }
+    // delete the parent temp directory of all custom dynamic partitions
+    Path parentPath = new Path(getCustomPartitionRootLocation(jobInfo, conf));
+    if (fs.exists(parentPath)) {
+      fs.delete(parentPath, true);
+    }
+  }
+
   private void cancelDelegationTokens(JobContext context) throws IOException{
     LOG.info("Cancelling deletgation token for the job.");
     HiveMetaStoreClient client = null;

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
(original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
Thu Feb 13 19:01:15 2014
@@ -49,9 +49,6 @@ import java.util.Map;
 public class FosterStorageHandler extends DefaultStorageHandler {
 
   public Configuration conf;
-  /** The directory under which data is initially written for a partitioned table */
-  protected static final String DYNTEMP_DIR_NAME = "_DYN";
-
   /** The directory under which data is initially written for a non partitioned table */
   protected static final String TEMP_DIR_NAME = "_TEMP";
 
@@ -118,17 +115,28 @@ public class FosterStorageHandler extend
       // For dynamic partitioned writes without all keyvalues specified,
       // we create a temp dir for the associated write job
       if (dynHash != null) {
-        parentPath = new Path(parentPath,
-          DYNTEMP_DIR_NAME + dynHash).toString();
+        // if external table and custom root specified, update the parent path
+        if (Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL"))
+            && jobInfo.getCustomDynamicRoot() != null
+            && jobInfo.getCustomDynamicRoot().length() > 0) {
+          parentPath = new Path(parentPath, jobInfo.getCustomDynamicRoot()).toString();
+        }
+        parentPath = new Path(parentPath, FileOutputCommitterContainer.DYNTEMP_DIR_NAME +
dynHash).toString();
       }
 
       String outputLocation;
 
-      if ((dynHash == null)
+      if ((dynHash != null)
+          && Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL"))
+          && jobInfo.getCustomDynamicPath() != null
+          && jobInfo.getCustomDynamicPath().length() > 0) {
+        // dynamic partitioning with custom path; resolve the custom path
+        // using partition column values
+        outputLocation = HCatFileUtil.resolveCustomPath(jobInfo, null, true);
+      } else if ((dynHash == null)
            && Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL"))
            && jobInfo.getLocation() != null && jobInfo.getLocation().length()
> 0) {
         // honor custom location for external table apart from what metadata specifies
-        // only if we're not using dynamic partitioning - see HIVE-5011
         outputLocation = jobInfo.getLocation();
       } else if (dynHash == null && jobInfo.getPartitionValues().size() == 0) {
         // For non-partitioned tables, we send them to the temp dir

Added: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java?rev=1568008&view=auto
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java
(added)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java
Thu Feb 13 19:01:15 2014
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+
+public class HCatFileUtil {
+
+  // regex of the form: ${column name}. Following characters are not allowed in column name:
+  // whitespace characters, /, {, }, \
+  private static final Pattern customPathPattern = Pattern.compile("(\\$\\{)([^\\s/\\{\\}\\\\]+)(\\})");
+
+  // This method parses the custom dynamic path and replaces each occurrence
+  // of column name within regex pattern with its corresponding value, if provided
+  public static String resolveCustomPath(OutputJobInfo jobInfo,
+      Map<String, String> dynPartKVs, boolean createRegexPath) {
+    // get custom path string
+    String customPath = jobInfo.getCustomDynamicPath();
+    // create matcher for custom path
+    Matcher matcher = customPathPattern.matcher(customPath);
+    // get the set of all partition columns in custom path
+    HashSet<String> partColumns = new HashSet<String>();
+    Map<String, String> partKVs = dynPartKVs != null ? dynPartKVs :
+      jobInfo.getPartitionValues();
+
+    // build the final custom path string by replacing each column name with
+    // its value, if provided
+    StringBuilder sb = new StringBuilder();
+    int previousEndIndex = 0;
+    while (matcher.find()) {
+      // append the path substring since previous match
+      sb.append(customPath.substring(previousEndIndex, matcher.start()));
+      if (createRegexPath) {
+        // append the first group within pattern: "${"
+        sb.append(matcher.group(1));
+      }
+
+      // column name is the second group from current match
+      String columnName = matcher.group(2).toLowerCase();
+      partColumns.add(columnName);
+
+      // find the value of matched column
+      String columnValue = partKVs.get(columnName);
+      // if column value is provided, replace column name with value
+      if (columnValue != null) {
+        sb.append(columnValue);
+      } else {
+        sb.append("__HIVE_DEFAULT_PARTITION__");
+      }
+
+      if (createRegexPath) {
+        // append the third group within pattern: "}"
+        sb.append(matcher.group(3));
+      }
+
+      // update startIndex
+      previousEndIndex = matcher.end();
+    }
+
+    // append the trailing path string, if any
+    if (previousEndIndex < customPath.length()) {
+      sb.append(customPath.substring(previousEndIndex, customPath.length()));
+    }
+
+    // validate that the set of partition columns found in custom path must match
+    // the set of dynamic partitions
+    if (partColumns.size() != jobInfo.getDynamicPartitioningKeys().size()) {
+      throw new IllegalArgumentException("Unable to configure custom dynamic location, "
+          + " mismatch between number of dynamic partition columns obtained[" + partColumns.size()
+          + "] and number of dynamic partition columns required["
+          + jobInfo.getDynamicPartitioningKeys().size() + "]");
+    }
+
+    return sb.toString();
+  }
+
+  public static void getPartKeyValuesForCustomLocation(Map<String, String> partSpec,
+      OutputJobInfo jobInfo, String partitionPath) {
+    // create matchers for custom path string as well as actual dynamic partition path created
+    Matcher customPathMatcher = customPathPattern.matcher(jobInfo.getCustomDynamicPath());
+    Matcher dynamicPathMatcher = customPathPattern.matcher(partitionPath);
+
+    while (customPathMatcher.find() && dynamicPathMatcher.find()) {
+      // get column name from custom path matcher and column value from dynamic path matcher
+      partSpec.put(customPathMatcher.group(2), dynamicPathMatcher.group(2));
+    }
+
+    // add any partition key values provided as part of job info
+    partSpec.putAll(jobInfo.getPartitionValues());
+  }
+
+  public static void setCustomPath(String customPathFormat, OutputJobInfo jobInfo) {
+    // find the root of all custom paths from custom pattern. The root is the
+    // largest prefix in input pattern string that doesn't match customPathPattern
+    Path customPath = new Path(customPathFormat);
+    URI customURI = customPath.toUri();
+    while (customPath != null && !customPath.toString().isEmpty()) {
+      Matcher m = customPathPattern.matcher(customPath.toString());
+      if (!m.find()) {
+        break;
+      }
+      customPath = customPath.getParent();
+    }
+
+    URI rootURI = customPath.toUri();
+    URI childURI = rootURI.relativize(customURI);
+    jobInfo.setCustomDynamicLocation(rootURI.getPath(), childURI.getPath());
+  }
+}

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
(original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
Thu Feb 13 19:01:15 2014
@@ -159,6 +159,11 @@ public class HCatOutputFormat extends HC
           }
           conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash);
 
+          // if custom pattern is set in case of dynamic partitioning, configure custom path
+          String customPattern = conf.get(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN);
+          if (customPattern != null) {
+            HCatFileUtil.setCustomPath(customPattern, outputJobInfo);
+          }
         }
 
         outputJobInfo.setPartitionValues(valueMap);

Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
(original)
+++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
Thu Feb 13 19:01:15 2014
@@ -50,6 +50,12 @@ public class OutputJobInfo implements Se
   /** The location of the partition being written */
   private String location;
 
+  /** The root location of custom dynamic partitions being written */
+  private String customDynamicRoot;
+
+  /** The relative path of custom dynamic partitions being written */
+  private String customDynamicPath;
+
   /** The partition values to publish to, if used for output*/
   private Map<String, String> partitionValues;
 
@@ -163,6 +169,28 @@ public class OutputJobInfo implements Se
   }
 
   /**
+   * @param customDynamicLocation the custom location for dynamic partitions
+   */
+  void setCustomDynamicLocation(String customDynamicRoot, String customDynamicPath) {
+    this.customDynamicRoot = customDynamicRoot;
+    this.customDynamicPath = customDynamicPath;
+  }
+
+  /**
+   * @return the root location for custom dynamic partitions
+   */
+  String getCustomDynamicRoot() {
+    return customDynamicRoot;
+  }
+
+  /**
+   * @return the relative path custom location for dynamic partitions
+   */
+  String getCustomDynamicPath() {
+    return customDynamicPath;
+  }
+
+  /**
    * Sets the value of partitionValues
    * @param partitionValues the partition values to set
    */

Modified: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
(original)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
Thu Feb 13 19:01:15 2014
@@ -84,6 +84,7 @@ public abstract class HCatMapReduceTest 
   protected abstract List<FieldSchema> getTableColumns();
 
   private static FileSystem fs;
+  private String externalTableLocation = null;
 
   protected Boolean isTableExternal() {
     return false;
@@ -123,6 +124,12 @@ public abstract class HCatMapReduceTest 
       String databaseName = (dbName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
 
       client.dropTable(databaseName, tableName);
+      // in case of external table, drop the table contents as well
+      if (isTableExternal() && (externalTableLocation != null)) {
+        if (fs.exists(new Path(externalTableLocation))) {
+          fs.delete(new Path(externalTableLocation), true);
+        }
+      }
     } catch (Exception e) {
       e.printStackTrace();
       throw e;
@@ -167,6 +174,9 @@ public abstract class HCatMapReduceTest 
     sd.setOutputFormat(outputFormat());
 
     Map<String, String> tableParams = new HashMap<String, String>();
+    if (isTableExternal()) {
+      tableParams.put("EXTERNAL", "TRUE");
+    }
     tbl.setParameters(tableParams);
 
     client.createTable(tbl);
@@ -234,7 +244,8 @@ public abstract class HCatMapReduceTest 
   Job runMRCreate(Map<String, String> partitionValues,
           List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
           int writeCount, boolean assertWrite) throws Exception {
-    return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite,
true);
+    return runMRCreate(partitionValues, partitionColumns, records, writeCount, assertWrite,
+        true, null);
   }
 
   /**
@@ -250,7 +261,8 @@ public abstract class HCatMapReduceTest 
    */
   Job runMRCreate(Map<String, String> partitionValues,
           List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
-          int writeCount, boolean assertWrite, boolean asSingleMapTask) throws Exception
{
+          int writeCount, boolean assertWrite, boolean asSingleMapTask,
+          String customDynamicPathPattern) throws Exception {
 
     writeRecords = records;
     MapCreate.writeCount = 0;
@@ -283,6 +295,9 @@ public abstract class HCatMapReduceTest 
     job.setOutputFormatClass(HCatOutputFormat.class);
 
     OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName, partitionValues);
+    if (customDynamicPathPattern != null) {
+      job.getConfiguration().set(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN, customDynamicPathPattern);
+    }
     HCatOutputFormat.setOutput(job, outputJobInfo);
 
     job.setMapOutputKeyClass(BytesWritable.class);
@@ -313,6 +328,10 @@ public abstract class HCatMapReduceTest 
       Assert.assertEquals(writeCount, MapCreate.writeCount);
     }
 
+    if (isTableExternal()) {
+      externalTableLocation = outputJobInfo.getTableInfo().getTableLocation();
+    }
+
     return job;
   }
 

Modified: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
(original)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
Thu Feb 13 19:01:15 2014
@@ -101,7 +101,7 @@ public class TestHCatDynamicPartitioned 
    */
   @Test
   public void testHCatDynamicPartitionedTable() throws Exception {
-    runHCatDynamicPartitionedTable(true);
+    runHCatDynamicPartitionedTable(true, null);
   }
 
   /**
@@ -110,12 +110,13 @@ public class TestHCatDynamicPartitioned 
    */
   @Test
   public void testHCatDynamicPartitionedTableMultipleTask() throws Exception {
-    runHCatDynamicPartitionedTable(false);
+    runHCatDynamicPartitionedTable(false, null);
   }
 
-  protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask) throws Exception
{
+  protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask,
+      String customDynamicPathPattern) throws Exception {
     generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
-    runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask);
+    runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true, asSingleMapTask, customDynamicPathPattern);
 
     runMRRead(NUM_RECORDS);
 
@@ -142,7 +143,8 @@ public class TestHCatDynamicPartitioned 
     IOException exc = null;
     try {
       generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
-      Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false);
+      Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, false,
+          true, customDynamicPathPattern);
 
       if (HCatUtil.isHadoop23()) {
         Assert.assertTrue(job.isSuccessful()==false);

Modified: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
--- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
(original)
+++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
Thu Feb 13 19:01:15 2014
@@ -19,6 +19,9 @@
 
 package org.apache.hive.hcatalog.mapreduce;
 
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 public class TestHCatExternalDynamicPartitioned extends TestHCatDynamicPartitioned {
 
   @Override
@@ -26,4 +29,20 @@ public class TestHCatExternalDynamicPart
     return true;
   }
 
+  @BeforeClass
+  public static void generateInputData() throws Exception {
+    tableName = "testHCatExternalDynamicPartitionedTable";
+    generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+    generateDataColumns();
+  }
+
+  /**
+   * Run the external dynamic partitioning test but with single map task
+   * @throws Exception
+   */
+  @Test
+  public void testHCatExternalDynamicCustomLocation() throws Exception {
+    runHCatDynamicPartitionedTable(true, "mapred/externalDynamicOutput/${p1}");
+  }
+
 }



Mime
View raw message