incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tof...@apache.org
Subject svn commit: r1244334 [1/4] - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hadoop/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hcatalog/cli/SemanticAnalysis/ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/mapred/ sr...
Date Wed, 15 Feb 2012 03:53:52 GMT
Author: toffer
Date: Wed Feb 15 03:53:50 2012
New Revision: 1244334

URL: http://svn.apache.org/viewvc?rev=1244334&view=rev
Log:
HCATALOG-240 Changes to HCatOutputFormat to make it use SerDes instead of StorageDriver (toffer)

Added:
    incubator/hcatalog/trunk/src/java/org/apache/hadoop/
    incubator/hcatalog/trunk/src/java/org/apache/hadoop/mapred/
    incubator/hcatalog/trunk/src/java/org/apache/hadoop/mapred/HCatMapRedUtil.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java.broken
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java.broken
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java.broken
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatStorageHandler.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/ProgressReporter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximLoader.java.broken
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java.broken
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestStorageHandlerProperties.java.broken
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java.broken
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java.broken
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java.broken
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java.broken
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java.broken
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPermsInheritance.java.broken
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestPigStorageDriver.java.broken
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputStorageDriver.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputStorageDriver.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputFormatContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/RecordWriterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
    incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/GroupByAge.java
    incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/HBaseReadWrite.java
    incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/ReadWrite.java
    incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreComplex.java
    incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreDemo.java
    incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/StoreNumbers.java
    incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteJson.java
    incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteRC.java
    incubator/hcatalog/trunk/src/test/e2e/hcatalog/udfs/java/org/apache/hcatalog/utils/WriteText.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/DummyStorageHandler.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/cli/TestPermsGrp.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/rcfile/TestRCFileOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/build.xml

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed Feb 15 03:53:50 2012
@@ -28,6 +28,8 @@ Trunk (unreleased changes)
   HCAT-2 Support nested schema conversion between Hive an Pig (julienledem via hashutosh)
 
   IMPROVEMENTS
+  HCAT-240. Changes to HCatOutputFormat to make it use SerDes instead of StorageDriver (toffer)
+
   HCAT-194. Better error messages for HCatalog access control errors (julienledem via hashutosh)  
   
   OPTIMIZATIONS

Added: incubator/hcatalog/trunk/src/java/org/apache/hadoop/mapred/HCatMapRedUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hadoop/mapred/HCatMapRedUtil.java?rev=1244334&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hadoop/mapred/HCatMapRedUtil.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hadoop/mapred/HCatMapRedUtil.java Wed Feb 15 03:53:50 2012
@@ -0,0 +1,26 @@
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.util.Progressable;
+
+public class HCatMapRedUtil {
+
+    public static TaskAttemptContext createTaskAttemptContext(org.apache.hadoop.mapreduce.TaskAttemptContext context) {
+        return  createTaskAttemptContext(new JobConf(context.getConfiguration()),
+                                                            org.apache.hadoop.mapred.TaskAttemptID.forName(context.getTaskAttemptID().toString()),
+                                                             Reporter.NULL);
+    }
+
+    public static TaskAttemptContext createTaskAttemptContext(JobConf conf, TaskAttemptID id, Progressable progressable) {
+        return  new TaskAttemptContext(conf,id,progressable);
+    }
+
+    public static org.apache.hadoop.mapred.JobContext createJobContext(org.apache.hadoop.mapreduce.JobContext context) {
+        return createJobContext(new JobConf(context.getConfiguration()),
+                                            context.getJobID(),
+                                            Reporter.NULL);
+    }
+
+    public static JobContext createJobContext(JobConf conf, org.apache.hadoop.mapreduce.JobID id, Progressable progressable) {
+        return  new JobContext(conf,id,progressable);
+    }
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/cli/SemanticAnalysis/CreateTableHook.java Wed Feb 15 03:53:50 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hcatalog.cli.SemanticAnalysis;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
@@ -47,11 +48,9 @@ import org.apache.hcatalog.common.AuthUt
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.pig.drivers.LoadFuncBasedInputDriver;
-import org.apache.hcatalog.pig.drivers.StoreFuncBasedOutputDriver;
+import org.apache.hcatalog.mapreduce.HCatStorageHandler;
 import org.apache.hcatalog.rcfile.RCFileInputDriver;
 import org.apache.hcatalog.rcfile.RCFileOutputDriver;
-import org.apache.hcatalog.storagehandler.HCatStorageHandler;
 
 final class CreateTableHook extends AbstractSemanticAnalyzerHook {
 
@@ -220,7 +219,6 @@ final class CreateTableHook extends Abst
         }
         CreateTableDesc desc = ((DDLTask) rootTasks.get(rootTasks.size() - 1))
                 .getWork().getCreateTblDesc();
-
         Map<String, String> tblProps = desc.getTblProps();
         if (tblProps == null) {
             // tblProps will be null if user didnt use tblprops in his CREATE
@@ -243,19 +241,19 @@ final class CreateTableHook extends Abst
             // to authorize.
             try {
                 HCatStorageHandler storageHandlerInst = HCatUtil
-                        .getStorageHandler(context.getConf(), storageHandler);
+                        .getStorageHandler(context.getConf(),
+                                                     desc.getStorageHandler(),
+                                                     desc.getSerName(),
+                                                     desc.getInputFormat(),
+                                                     desc.getOutputFormat());
                 HiveAuthorizationProvider auth = storageHandlerInst
                         .getAuthorizationProvider();
 
                 // TBD: To pass in the exact read and write privileges.
                 String databaseName = context.getHive().newTable(desc.getTableName()).getDbName();
                 auth.authorize(context.getHive().getDatabase(databaseName), null, null);
-
-                tblProps.put(HCatConstants.HCAT_ISD_CLASS, storageHandlerInst
-                        .getInputStorageDriver().getName());
-                tblProps.put(HCatConstants.HCAT_OSD_CLASS, storageHandlerInst
-                        .getOutputStorageDriver().getName());
-
+            } catch (IOException e) {
+                throw new SemanticException(e);
             } catch (HiveException e) {
                 throw new SemanticException(e);
             }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java Wed Feb 15 03:53:50 2012
@@ -31,6 +31,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -44,8 +45,8 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.thrift.DelegationTokenIdentifier;
@@ -61,8 +62,11 @@ import org.apache.hcatalog.data.Pair;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.FosterStorageHandler;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
-import org.apache.hcatalog.storagehandler.HCatStorageHandler;
+import org.apache.hcatalog.mapreduce.HCatStorageHandler;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.mapreduce.StorerInfo;
 import org.apache.thrift.TException;
 
 public class HCatUtil {
@@ -449,20 +453,58 @@ public class HCatUtil {
         logger.info("\tservice : " + t.getService());
     }
 
+    /**
+     * Create an instance of a storage handler defined in storerInfo. If one cannot be found
+     * then FosterStorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe.
+     * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system.
+     * @param conf job's configuration will be used to configure the Configurable StorageHandler
+     * @param storerInfo StorerInfo to definining the StorageHandler and InputFormat, OutputFormat and SerDe
+     * @return storageHandler instance
+     * @throws IOException
+     */
+    public static HCatStorageHandler getStorageHandler(Configuration conf, StorerInfo storerInfo) throws IOException {
+        return getStorageHandler(conf,
+                                              storerInfo.getStorageHandlerClass(),
+                                              storerInfo.getSerdeClass(),
+                                              storerInfo.getIfClass(),
+                                              storerInfo.getOfClass());
+    }
+
+    /**
+     * Create an instance of a storage handler. If storageHandler == null,
+     * then surrrogate StorageHandler is used to encapsulate the InputFormat, OutputFormat and SerDe.
+     * This StorageHandler assumes the other supplied storage artifacts are for a file-based storage system.
+     * @param conf job's configuration will be used to configure the Configurable StorageHandler
+     * @param storageHandler fully qualified class name of the desired StorageHandle instance
+     * @param serDe fully qualified class name of the desired SerDe instance
+     * @param inputFormat fully qualified class name of the desired InputFormat instance
+     * @param outputFormat fully qualified class name of the desired outputFormat instance
+     * @return storageHandler instance
+     * @throws IOException
+     */
     public static HCatStorageHandler getStorageHandler(Configuration conf,
-            String className) throws HiveException {
+                                                                                  String storageHandler,
+                                                                                  String serDe,
+                                                                                  String inputFormat,
+                                                                                  String outputFormat) throws IOException {
 
-        if (className == null) {
-            return null;
+
+        if (storageHandler == null) {
+            try {
+                return new FosterStorageHandler(inputFormat,
+                                                                  outputFormat,
+                                                                  serDe);
+            } catch (ClassNotFoundException e) {
+                throw new IOException("Failed to load foster storage handler",e);
+            }
         }
+
         try {
             Class<? extends HCatStorageHandler> handlerClass = (Class<? extends HCatStorageHandler>) Class
-                    .forName(className, true, JavaUtils.getClassLoader());
-            HCatStorageHandler storageHandler = (HCatStorageHandler) ReflectionUtils
-                    .newInstance(handlerClass, conf);
-            return storageHandler;
+                    .forName(storageHandler, true, JavaUtils.getClassLoader());
+            return (HCatStorageHandler)ReflectionUtils.newInstance(handlerClass, conf);
         } catch (ClassNotFoundException e) {
-            throw new HiveException("Error in loading storage handler."
+            throw new IOException("Error in loading storage handler."
                     + e.getMessage(), e);
         }
     }
@@ -478,4 +520,44 @@ public class HCatUtil {
             +"<databasename>.<table name> or <table name>. Got " + tableName);
       }
     }
+
+    public static void configureOutputStorageHandler(HCatStorageHandler storageHandler,
+                                                                              JobContext context,
+                                                                              OutputJobInfo outputJobInfo) {
+        //TODO replace IgnoreKeyTextOutputFormat with a HiveOutputFormatWrapper in StorageHandler
+        TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(),
+                                                                   storageHandler.getInputFormatClass(),
+                                                                   IgnoreKeyTextOutputFormat.class,
+                                                                   outputJobInfo.getTableInfo().getStorerInfo().getProperties());
+        if(tableDesc.getJobProperties() == null)
+            tableDesc.setJobProperties(new HashMap<String, String>());
+        for (Map.Entry<String,String> el: context.getConfiguration()) {
+           tableDesc.getJobProperties().put(el.getKey(),el.getValue());
+        }
+
+        Map<String,String> jobProperties = new HashMap<String,String>();
+        try {
+            tableDesc.getJobProperties().put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+
+            storageHandler.configureOutputJobProperties(tableDesc,jobProperties);
+
+            for(Map.Entry<String,String> el: jobProperties.entrySet()) {
+                context.getConfiguration().set(el.getKey(),el.getValue());
+            }
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to configure StorageHandler",e);
+        }
+    }
+
+    /**
+     * Replace the contents of dest with the contents of src
+     * @param src
+     * @param dest
+     */
+    public static void copyConf(Configuration src, Configuration dest) {
+        dest.clear();
+        for(Map.Entry<String,String> el : src) {
+            dest.set(el.getKey(),el.getValue());
+        }
+    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java Wed Feb 15 03:53:50 2012
@@ -139,7 +139,7 @@ public class HCatMapredOutputFormat impl
 
     OutputJobInfo outputJobInfo = OutputJobInfo.create(
         dbAndTableName.first, dbAndTableName.second, 
-        ptnValues, null, null);
+        ptnValues);
     
     Job job = new Job(new Configuration()); 
       // TODO : verify with thw if this needs to be shim-ed. There exists no current Shim

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java Wed Feb 15 03:53:50 2012
@@ -21,6 +21,7 @@ package org.apache.hcatalog.mapreduce;
 import java.io.IOException;
 
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.mapred.HCatMapRedUtil;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -38,54 +39,56 @@ class DefaultOutputCommitterContainer ex
      * @param baseCommitter OutputCommitter to contain
      * @throws IOException
      */
-    public DefaultOutputCommitterContainer(JobContext context, OutputCommitter baseCommitter) throws IOException {
+    public DefaultOutputCommitterContainer(JobContext context, org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException {
         super(context,baseCommitter);
     }
 
     @Override
     public void abortTask(TaskAttemptContext context) throws IOException {
-        getBaseOutputCommitter().abortTask(context);
+        getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context));
     }
 
     @Override
     public void commitTask(TaskAttemptContext context) throws IOException {
-        getBaseOutputCommitter().commitTask(context);
+        getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context));
     }
 
     @Override
     public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
-        return getBaseOutputCommitter().needsTaskCommit(context);
+        return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context));
     }
 
     @Override
     public void setupJob(JobContext context) throws IOException {
-        getBaseOutputCommitter().setupJob(context);
+        getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context));
     }
 
     @Override
     public void setupTask(TaskAttemptContext context) throws IOException {
-        getBaseOutputCommitter().setupTask(context);
+        getBaseOutputCommitter().setupTask(HCatMapRedUtil.createTaskAttemptContext(context));
     }
 
     @Override
     public void abortJob(JobContext jobContext, State state) throws IOException {
-        getBaseOutputCommitter().abortJob(jobContext, state);
+        getBaseOutputCommitter().abortJob(HCatMapRedUtil.createJobContext(jobContext), state);
+        cleanupJob(jobContext);
     }
 
     @Override
     public void commitJob(JobContext jobContext) throws IOException {
-        getBaseOutputCommitter().commitJob(jobContext);
+        getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext));
+        cleanupJob(jobContext);
     }
 
     @Override
     public void cleanupJob(JobContext context) throws IOException {
-        getBaseOutputCommitter().cleanupJob(context);
+        getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
 
         OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
 
         //Cancel HCat and JobTracker tokens
         try {
-            HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(jobInfo.getServerUri(), context.getConfiguration());
+            HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, context.getConfiguration());
             String tokenStrForm = client.getTokenStrForm();
             if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
               client.cancelDelegationToken(tokenStrForm);

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputFormatContainer.java Wed Feb 15 03:53:50 2012
@@ -21,11 +21,12 @@ package org.apache.hcatalog.mapreduce;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 
 import java.io.IOException;
@@ -37,7 +38,7 @@ import java.io.IOException;
  */
 class DefaultOutputFormatContainer extends OutputFormatContainer {
 
-    public DefaultOutputFormatContainer(OutputFormat<WritableComparable<?>, Writable> of) {
+    public DefaultOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<WritableComparable<?>, Writable> of) {
         super(of);
     }
 
@@ -52,7 +53,8 @@ class DefaultOutputFormatContainer exten
     @Override
     public RecordWriter<WritableComparable<?>, HCatRecord>
     getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
-        return new DefaultRecordWriterContainer(context, getBaseOutputFormat().getRecordWriter(context));
+        return new DefaultRecordWriterContainer(context,
+                getBaseOutputFormat().getRecordWriter(null, new JobConf(context.getConfiguration()),null, InternalUtil.createReporter(context)));
     }
 
 
@@ -67,8 +69,7 @@ class DefaultOutputFormatContainer exten
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext context)
             throws IOException, InterruptedException {
-        OutputFormat outputFormat = getBaseOutputFormat();
-        return new DefaultOutputCommitterContainer(context, getBaseOutputFormat().getOutputCommitter(context));
+        return new DefaultOutputCommitterContainer(context, new JobConf(context.getConfiguration()).getOutputCommitter());
     }
 
     /**
@@ -78,8 +79,10 @@ class DefaultOutputFormatContainer exten
      */
     @Override
     public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
-        OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getBaseOutputFormat();
-        outputFormat.checkOutputSpecs(context);
+        org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getBaseOutputFormat();
+        JobConf jobConf = new JobConf(context.getConfiguration());
+        outputFormat.checkOutputSpecs(null,jobConf);
+        HCatUtil.copyConf(jobConf, context.getConfiguration());
     }
 
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultRecordWriterContainer.java Wed Feb 15 03:53:50 2012
@@ -20,10 +20,14 @@ package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 
 /**
@@ -32,9 +36,10 @@ import org.apache.hcatalog.data.HCatReco
  */
 class DefaultRecordWriterContainer extends RecordWriterContainer {
 
-    private final HCatOutputStorageDriver storageDriver;
-    private final RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter;
+    private final HCatStorageHandler storageHandler;
+    private final SerDe serDe;
     private final OutputJobInfo jobInfo;
+    private final ObjectInspector hcatRecordOI;
 
     /**
      * @param context current JobContext
@@ -43,29 +48,34 @@ class DefaultRecordWriterContainer exten
      * @throws InterruptedException
      */
     public DefaultRecordWriterContainer(TaskAttemptContext context,
-                                        RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter) throws IOException, InterruptedException {
+                                        org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseRecordWriter) throws IOException, InterruptedException {
         super(context,baseRecordWriter);
         jobInfo = HCatOutputFormat.getJobInfo(context);
-        this.storageDriver = HCatOutputFormat.getOutputDriverInstance(context, jobInfo);
-        this.baseRecordWriter = baseRecordWriter;
+        storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
+        HCatOutputFormat.configureOutputStorageHandler(context);
+        serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),context.getConfiguration());
+        hcatRecordOI = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema());
+        try {
+            InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo);
+        } catch (SerDeException e) {
+            throw new IOException("Failed to initialize SerDe",e);
+        }
     }
 
     @Override
     public void close(TaskAttemptContext context) throws IOException,
             InterruptedException {
-        baseRecordWriter.close(context);
+        getBaseRecordWriter().close(InternalUtil.createReporter(context));
     }
 
     @Override
     public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
             InterruptedException {
-        WritableComparable<?> generatedKey = storageDriver.generateKey(value);
-        Writable convertedValue = storageDriver.convertValue(value);
-        baseRecordWriter.write(generatedKey, convertedValue);
+        try {
+            getBaseRecordWriter().write(null, serDe.serialize(value, hcatRecordOI));
+        } catch (SerDeException e) {
+            throw new IOException("Failed to serialize object",e);
+        }
     }
 
-    @Override
-    public RecordWriter<? super WritableComparable<?>, ? super Writable> getBaseRecordWriter() {
-        return baseRecordWriter;
-    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Wed Feb 15 03:53:50 2012
@@ -33,11 +33,11 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapred.HCatMapRedUtil;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
-import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
@@ -47,7 +47,6 @@ import org.apache.hcatalog.data.schema.H
 import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatSchemaUtils;
 import org.apache.hcatalog.har.HarOutputCommitterPostProcessor;
-import org.apache.hcatalog.shims.HCatHadoopShims;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
@@ -69,7 +68,8 @@ class FileOutputCommitterContainer exten
     private boolean partitionsDiscovered;
 
     private Map<String, Map<String, String>> partitionsDiscoveredByPath;
-    private Map<String, HCatOutputStorageDriver> storageDriversDiscoveredByPath;
+    private Map<String, JobContext> contextDiscoveredByPath;
+    private final HCatStorageHandler cachedStorageHandler;
 
     HarOutputCommitterPostProcessor harProcessor = new HarOutputCommitterPostProcessor();
 
@@ -83,34 +83,39 @@ class FileOutputCommitterContainer exten
      * @throws IOException
      */
     public FileOutputCommitterContainer(JobContext context,
-                                        OutputCommitter baseCommitter) throws IOException {
+                                                          org.apache.hadoop.mapred.OutputCommitter baseCommitter) throws IOException {
         super(context, baseCommitter);
         jobInfo = HCatOutputFormat.getJobInfo(context);
         dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
 
         this.partitionsDiscovered = !dynamicPartitioningUsed;
+        cachedStorageHandler = HCatUtil.getStorageHandler(context.getConfiguration(),jobInfo.getTableInfo().getStorerInfo());
     }
 
     @Override
     public void abortTask(TaskAttemptContext context) throws IOException {
         if (!dynamicPartitioningUsed){
-            getBaseOutputCommitter().abortTask(context);
+            getBaseOutputCommitter().abortTask(HCatMapRedUtil.createTaskAttemptContext(context));
         }
     }
 
     @Override
     public void commitTask(TaskAttemptContext context) throws IOException {
         if (!dynamicPartitioningUsed){
-            getBaseOutputCommitter().commitTask(context);
-        }else{
-            // called explicitly through FileRecordWriterContainer.close() if dynamic
+            OutputJobInfo outputJobInfo = HCatOutputFormat.getJobInfo(context);
+            //TODO fix this hack, something wrong with pig
+            //running multiple storers in a single job, the real output dir got overwritten or something
+            //the location in OutputJobInfo is still correct so we'll use that
+            //TestHCatStorer.testMultiPartColsInData() used to fail without this
+            context.getConfiguration().set("mapred.output.dir",outputJobInfo.getLocation());
+            getBaseOutputCommitter().commitTask(HCatMapRedUtil.createTaskAttemptContext(context));
         }
     }
 
     @Override
     public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
         if (!dynamicPartitioningUsed){
-            return getBaseOutputCommitter().needsTaskCommit(context);
+            return getBaseOutputCommitter().needsTaskCommit(HCatMapRedUtil.createTaskAttemptContext(context));
         }else{
             // called explicitly through FileRecordWriterContainer.close() if dynamic - return false by default
             return false;
@@ -120,7 +125,7 @@ class FileOutputCommitterContainer exten
     @Override
     public void setupJob(JobContext context) throws IOException {
         if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-            getBaseOutputCommitter().setupJob(context);
+            getBaseOutputCommitter().setupJob(HCatMapRedUtil.createJobContext(context));
         }
         // in dynamic usecase, called through FileRecordWriterContainer
     }
@@ -128,28 +133,25 @@ class FileOutputCommitterContainer exten
     @Override
     public void setupTask(TaskAttemptContext context) throws IOException {
         if (!dynamicPartitioningUsed){
-            getBaseOutputCommitter().setupTask(context);
-        }else{
-            // called explicitly through FileRecordWriterContainer.write() if dynamic
+            getBaseOutputCommitter().setupTask(HCatMapRedUtil.createTaskAttemptContext(context));
         }
     }
 
     @Override
     public void abortJob(JobContext jobContext, State state) throws IOException {
+        org.apache.hadoop.mapred.JobContext
+                marpedJobContext = HCatMapRedUtil.createJobContext(jobContext);
         if (dynamicPartitioningUsed){
             discoverPartitions(jobContext);
         }
 
         if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-            getBaseOutputCommitter().abortJob(jobContext, state);
+            getBaseOutputCommitter().abortJob(marpedJobContext, state);
         }
         else if (dynamicPartitioningUsed){
-            for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){
+            for(JobContext currContext : contextDiscoveredByPath.values()){
                 try {
-                    baseOsd.abortOutputCommitterJob(
-                            HCatHadoopShims.Instance.get().createTaskAttemptContext(
-                                    jobContext.getConfiguration(), TaskAttemptID.forName(ptnRootLocation)
-                            ),state);
+                    new JobConf(currContext.getConfiguration()).getOutputCommitter().abortJob(currContext, state);
                 } catch (Exception e) {
                     throw new IOException(e);
                 }
@@ -159,8 +161,7 @@ class FileOutputCommitterContainer exten
         OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
 
         try {
-            HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(
-                    jobInfo.getServerUri(), jobContext.getConfiguration());
+            HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, jobContext.getConfiguration());
             // cancel the deleg. tokens that were acquired for this job now that
             // we are done - we should cancel if the tokens were acquired by
             // HCatOutputFormat and not if they were supplied by Oozie. In the latter
@@ -215,7 +216,7 @@ class FileOutputCommitterContainer exten
             discoverPartitions(jobContext);
         }
         if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-            getBaseOutputCommitter().commitJob(jobContext);
+            getBaseOutputCommitter().commitJob(HCatMapRedUtil.createJobContext(jobContext));
         }
         // create _SUCCESS FILE if so requested.
         OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
@@ -237,6 +238,7 @@ class FileOutputCommitterContainer exten
 
     @Override
     public void cleanupJob(JobContext context) throws IOException {
+
         if (dynamicPartitioningUsed){
             discoverPartitions(context);
         }
@@ -251,15 +253,13 @@ class FileOutputCommitterContainer exten
         if( table.getPartitionKeys().size() == 0 ) {
             //non partitioned table
             if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-                getBaseOutputCommitter().cleanupJob(context);
+                getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
             }
             else if (dynamicPartitioningUsed){
-                for(HCatOutputStorageDriver baseOsd : storageDriversDiscoveredByPath.values()){
+                for(JobContext currContext : contextDiscoveredByPath.values()){
                     try {
-                        baseOsd.cleanupOutputCommitterJob(
-                                HCatHadoopShims.Instance.get().createTaskAttemptContext(
-                                        context.getConfiguration(), TaskAttemptID.forName(ptnRootLocation)
-                                ));
+                        JobConf jobConf = new JobConf(currContext.getConfiguration());
+                        jobConf.getOutputCommitter().cleanupJob(currContext);
                     } catch (Exception e) {
                         throw new IOException(e);
                     }
@@ -280,7 +280,7 @@ class FileOutputCommitterContainer exten
         List<Partition> partitionsAdded = new ArrayList<Partition>();
 
         try {
-            client = HCatOutputFormat.createHiveClient(jobInfo.getServerUri(), conf);
+            client = HCatOutputFormat.createHiveClient(null, conf);
 
             StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters());
 
@@ -361,7 +361,7 @@ class FileOutputCommitterContainer exten
             }
 
             if(getBaseOutputCommitter() != null && !dynamicPartitioningUsed) {
-                getBaseOutputCommitter().cleanupJob(context);
+                getBaseOutputCommitter().cleanupJob(HCatMapRedUtil.createJobContext(context));
             }
 
             //Cancel HCat and JobTracker tokens
@@ -627,7 +627,6 @@ class FileOutputCommitterContainer exten
     private void discoverPartitions(JobContext context) throws IOException {
         if (!partitionsDiscovered){
             //      LOG.info("discover ptns called");
-
             OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
 
             harProcessor.setEnabled(jobInfo.getHarRequested());
@@ -639,17 +638,15 @@ class FileOutputCommitterContainer exten
             FileSystem fs = loadPath.getFileSystem(context.getConfiguration());
 
             // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
-
             String dynPathSpec = loadPath.toUri().getPath();
             dynPathSpec = dynPathSpec.replaceAll("__HIVE_DEFAULT_PARTITION__", "*");
-            // TODO : replace this with a param pull from HiveConf
 
             //      LOG.info("Searching for "+dynPathSpec);
-            Path pathPattern = new Path(loadPath, dynPathSpec);
+            Path pathPattern = new Path(dynPathSpec);
             FileStatus[] status = fs.globStatus(pathPattern);
 
             partitionsDiscoveredByPath = new LinkedHashMap<String,Map<String, String>>();
-            storageDriversDiscoveredByPath = new LinkedHashMap<String,HCatOutputStorageDriver>();
+            contextDiscoveredByPath = new LinkedHashMap<String,JobContext>();
 
 
             if (status.length == 0) {
@@ -672,8 +669,9 @@ class FileOutputCommitterContainer exten
                     LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>();
                     Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
                     partitionsDiscoveredByPath.put(st.getPath().toString(),fullPartSpec);
-                    storageDriversDiscoveredByPath.put(st.getPath().toString(),
-                            HCatOutputFormat.getOutputDriverInstance(context, jobInfo, fullPartSpec));
+                    JobContext currContext = new JobContext(context.getConfiguration(),context.getJobID());
+                    HCatOutputFormat.configureOutputStorageHandler(context, jobInfo, fullPartSpec);
+                    contextDiscoveredByPath.put(st.getPath().toString(),currContext);
                 }
             }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java Wed Feb 15 03:53:50 2012
@@ -29,13 +29,15 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.thrift.TException;
 
@@ -49,7 +51,6 @@ import java.util.Map;
  * This implementation supports the following HCatalog features: partitioning, dynamic partitioning, Hadoop Archiving, etc.
  */
 class FileOutputFormatContainer extends OutputFormatContainer {
-    private OutputFormat<? super WritableComparable<?>, ? super Writable> of;
 
     private static final PathFilter hiddenFileFilter = new PathFilter(){
       public boolean accept(Path p){
@@ -61,19 +62,29 @@ class FileOutputFormatContainer extends 
     /**
      * @param of base OutputFormat to contain
      */
-    public FileOutputFormatContainer(OutputFormat<? super WritableComparable<?>, ? super Writable> of) {
+    public FileOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat<? super WritableComparable<?>, ? super Writable> of) {
         super(of);
-        this.of = of;
     }
 
     @Override
     public RecordWriter<WritableComparable<?>, HCatRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {
+        //this needs to be manually set, under normal circumstances MR Task does this
+        setWorkOutputPath(context);
+
         // When Dynamic partitioning is used, the RecordWriter instance initialized here isn't used. Can use null.
         // (That's because records can't be written until the values of the dynamic partitions are deduced.
         // By that time, a new local instance of RecordWriter, with the correct output-path, will be constructed.)
-        return new FileRecordWriterContainer(HCatOutputFormat.getJobInfo(context)
-                                                   .isDynamicPartitioningUsed()? null : of.getRecordWriter(context),
-                                             context);
+        RecordWriter<WritableComparable<?>, HCatRecord> rw =
+            new FileRecordWriterContainer(
+                HCatBaseOutputFormat.getJobInfo(context).isDynamicPartitioningUsed()?
+                    null:
+                    getBaseOutputFormat()
+                            .getRecordWriter(null,
+                                                     new JobConf(context.getConfiguration()),
+                                                                         context.getTaskAttemptID().toString(),
+                                                                         InternalUtil.createReporter(context)),
+                context);
+        return rw;
     }
 
     @Override
@@ -82,7 +93,7 @@ class FileOutputFormatContainer extends 
         try {
             handleDuplicatePublish(context,
                     jobInfo,
-                    HCatOutputFormat.createHiveClient(jobInfo.getServerUri(),context.getConfiguration()),
+                    HCatOutputFormat.createHiveClient(null,context.getConfiguration()),
                     jobInfo.getTableInfo().getTable());
         } catch (MetaException e) {
             throw new IOException(e);
@@ -91,12 +102,23 @@ class FileOutputFormatContainer extends 
         } catch (NoSuchObjectException e) {
             throw new IOException(e);        	
         }
-        of.checkOutputSpecs(context);
+
+        if(!jobInfo.isDynamicPartitioningUsed()) {
+            JobConf jobConf = new JobConf(context.getConfiguration());
+            getBaseOutputFormat().checkOutputSpecs(null, jobConf);
+            //checkoutputspecs might've set some properties we need to have context reflect that
+            HCatUtil.copyConf(jobConf,context.getConfiguration());
+        }
     }
 
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
-        return new FileOutputCommitterContainer(context,of.getOutputCommitter(context));
+        //this needs to be manually set, under normal circumstances MR Task does this
+        setWorkOutputPath(context);
+        return new FileOutputCommitterContainer(context,
+               HCatBaseOutputFormat.getJobInfo(context).isDynamicPartitioningUsed()?
+                       null:
+                       new JobConf(context.getConfiguration()).getOutputCommitter());
     }
 
     /**
@@ -186,4 +208,13 @@ class FileOutputFormatContainer extends 
 
         return values;
     }
+
+    static void setWorkOutputPath(TaskAttemptContext context) throws IOException {
+        String outputPath = context.getConfiguration().get("mapred.output.dir");
+        //we need to do this to get the task path and set it for mapred implementation
+        //since it can't be done automatically because of mapreduce->mapred abstraction
+        if(outputPath != null)
+            context.getConfiguration().set("mapred.work.output.dir",
+                    new FileOutputCommitter(new Path(outputPath), context).getWorkPath().toString());
+    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputStorageDriver.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputStorageDriver.java Wed Feb 15 03:53:50 2012
@@ -120,7 +120,8 @@ public abstract class FileOutputStorageD
 
     @Override
     OutputFormatContainer getOutputFormatContainer(OutputFormat outputFormat) {
-        return new FileOutputFormatContainer(outputFormat);
+        //broken
+        return new FileOutputFormatContainer(null);
     }
 }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileRecordWriterContainer.java Wed Feb 15 03:53:50 2012
@@ -18,14 +18,24 @@
 
 package org.apache.hcatalog.mapreduce;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.HCatMapRedUtil;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 
 import java.io.IOException;
@@ -40,15 +50,19 @@ import java.util.Map;
  */
 class FileRecordWriterContainer extends RecordWriterContainer {
 
-    private final HCatOutputStorageDriver storageDriver;
+    private final HCatStorageHandler storageHandler;
+    private final SerDe serDe;
+    private final ObjectInspector objectInspector;
 
     private boolean dynamicPartitioningUsed = false;
 
 //    static final private Log LOG = LogFactory.getLog(FileRecordWriterContainer.class);
 
-    private final Map<Integer,RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters;
-    private final Map<Integer,HCatOutputStorageDriver> baseDynamicStorageDrivers;
-    private final Map<Integer,OutputCommitter> baseDynamicCommitters;
+    private final Map<String, org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>> baseDynamicWriters;
+    private final Map<String, SerDe> baseDynamicSerDe;
+    private final Map<String, org.apache.hadoop.mapred.OutputCommitter> baseDynamicCommitters;
+    private final Map<String, org.apache.hadoop.mapred.TaskAttemptContext> dynamicContexts;
+    private final Map<String, ObjectInspector> dynamicObjectInspectors;
 
 
     private final List<Integer> partColsToDel;
@@ -64,12 +78,21 @@ class FileRecordWriterContainer extends 
      * @throws IOException
      * @throws InterruptedException
      */
-    public FileRecordWriterContainer(RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
+    public FileRecordWriterContainer(org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> baseWriter,
                                      TaskAttemptContext context) throws IOException, InterruptedException {
         super(context,baseWriter);
         this.context = context;
         jobInfo = HCatOutputFormat.getJobInfo(context);
 
+        storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
+        serDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),context.getConfiguration());
+        objectInspector = InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema());
+        try {
+            InternalUtil.initializeOutputSerDe(serDe, context.getConfiguration(), jobInfo);
+        } catch (SerDeException e) {
+            throw new IOException("Failed to inialize SerDe",e);
+        }
+
         // If partition columns occur in data, we want to remove them.
         partColsToDel = jobInfo.getPosOfPartCols();
         dynamicPartitioningUsed = jobInfo.isDynamicPartitioningUsed();
@@ -83,53 +106,59 @@ class FileRecordWriterContainer extends 
 
 
         if (!dynamicPartitioningUsed) {
-            storageDriver = HCatOutputFormat.getOutputDriverInstance(context,jobInfo);
-            this.baseDynamicStorageDrivers = null;
+            this.baseDynamicSerDe = null;
             this.baseDynamicWriters = null;
             this.baseDynamicCommitters = null;
-            prepareForStorageDriverOutput(context);
+            this.dynamicContexts = null;
+            this.dynamicObjectInspectors = null;
         }
         else {
-            storageDriver = null;
-            this.baseDynamicStorageDrivers = new HashMap<Integer,HCatOutputStorageDriver>();
-            this.baseDynamicWriters = new HashMap<Integer,RecordWriter<? super WritableComparable<?>, ? super Writable>>();
-            this.baseDynamicCommitters = new HashMap<Integer,OutputCommitter>();
+            this.baseDynamicSerDe = new HashMap<String,SerDe>();
+            this.baseDynamicWriters = new HashMap<String,org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable>>();
+            this.baseDynamicCommitters = new HashMap<String, org.apache.hadoop.mapred.OutputCommitter>();
+            this.dynamicContexts = new HashMap<String, org.apache.hadoop.mapred.TaskAttemptContext>();
+            this.dynamicObjectInspectors = new HashMap<String, ObjectInspector>();
         }
     }
 
     /**
      * @return the storageDriver
      */
-    public HCatOutputStorageDriver getStorageDriver() {
-        return storageDriver;
+    public HCatStorageHandler getStorageHandler() {
+        return storageHandler;
     }
 
     @Override
     public void close(TaskAttemptContext context) throws IOException,
             InterruptedException {
+        Reporter reporter = InternalUtil.createReporter(context);
         if (dynamicPartitioningUsed){
-            for (RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()){
-                bwriter.close(context);
+            for (org.apache.hadoop.mapred.RecordWriter<? super WritableComparable<?>, ? super Writable> bwriter : baseDynamicWriters.values()){
+                //We are in RecordWriter.close() make sense that the context would be TaskInputOutput
+                bwriter.close(reporter);
             }
-            for(Map.Entry<Integer,OutputCommitter>entry : baseDynamicCommitters.entrySet()) {
-//            for (HCatOutputStorageDriver osd : baseDynamicStorageDrivers.values()){
+            for(Map.Entry<String,org.apache.hadoop.mapred.OutputCommitter>entry : baseDynamicCommitters.entrySet()) {
+                org.apache.hadoop.mapred.TaskAttemptContext currContext = dynamicContexts.get(entry.getKey());
                 OutputCommitter baseOutputCommitter = entry.getValue();
-                if (baseOutputCommitter.needsTaskCommit(context)){
-                    baseOutputCommitter.commitTask(context);
+                if (baseOutputCommitter.needsTaskCommit(currContext)){
+                    baseOutputCommitter.commitTask(currContext);
                 }
             }
         } else {
-            getBaseRecordWriter().close(context);
+            getBaseRecordWriter().close(reporter);
         }
     }
 
     @Override
     public void write(WritableComparable<?> key, HCatRecord value) throws IOException,
             InterruptedException {
-        RecordWriter localWriter;
-        HCatOutputStorageDriver localDriver;
 
-//      HCatUtil.logList(LOG, "HCatRecord to write", value.getAll());
+        org.apache.hadoop.mapred.RecordWriter localWriter;
+        org.apache.hadoop.mapred.TaskAttemptContext localContext;
+        ObjectInspector localObjectInspector;
+        SerDe localSerDe;
+        OutputJobInfo localJobInfo = null;
+
 
         if (dynamicPartitioningUsed){
             // calculate which writer to use from the remaining values - this needs to be done before we delete cols
@@ -138,11 +167,9 @@ class FileRecordWriterContainer extends 
                 dynamicPartValues.add(value.get(colToAppend).toString());
             }
 
-            int dynHashCode = dynamicPartValues.hashCode();
-            if (!baseDynamicWriters.containsKey(dynHashCode)){
-//          LOG.info("Creating new storage driver["+baseDynamicStorageDrivers.size()
-//              +"/"+maxDynamicPartitions+ "] for "+dynamicPartValues.toString());
-                if ((maxDynamicPartitions != -1) && (baseDynamicStorageDrivers.size() > maxDynamicPartitions)){
+            String dynKey = dynamicPartValues.toString();
+            if (!baseDynamicWriters.containsKey(dynKey)){
+                if ((maxDynamicPartitions != -1) && (baseDynamicWriters.size() > maxDynamicPartitions)){
                     throw new HCatException(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS,
                             "Number of dynamic partitions being created "
                                     + "exceeds configured max allowable partitions["
@@ -151,54 +178,82 @@ class FileRecordWriterContainer extends 
                                     + HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS.varname
                                     + "] if needed.");
                 }
-//          HCatUtil.logList(LOG, "dynamicpartvals", dynamicPartValues);
-//          HCatUtil.logList(LOG, "dynamicpartCols", dynamicPartCols);
 
-                HCatOutputStorageDriver localOsd = createDynamicStorageDriver(dynamicPartValues);
+                org.apache.hadoop.mapred.TaskAttemptContext currTaskContext = HCatMapRedUtil.createTaskAttemptContext(context);
+                configureDynamicStorageHandler(currTaskContext, dynamicPartValues);
+                localJobInfo= HCatBaseOutputFormat.getJobInfo(currTaskContext);
+
+                //setup serDe
+                SerDe currSerDe = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), currTaskContext.getJobConf());
+                try {
+                    InternalUtil.initializeOutputSerDe(currSerDe, currTaskContext.getConfiguration(), localJobInfo);
+                } catch (SerDeException e) {
+                    throw new IOException("Failed to initialize SerDe",e);
+                }
 
-                RecordWriter baseRecordWriter = localOsd.getOutputFormat().getRecordWriter(context);
-                OutputCommitter baseOutputCommitter = localOsd.getOutputFormat().getOutputCommitter(context);
-                baseOutputCommitter.setupJob(context);
-                baseOutputCommitter.setupTask(context);
-                prepareForStorageDriverOutput(localOsd,context);
-                baseDynamicWriters.put(dynHashCode, baseRecordWriter);
-                baseDynamicStorageDrivers.put(dynHashCode,localOsd);
-                baseDynamicCommitters.put(dynHashCode,baseOutputCommitter);
+                //create base OutputFormat
+                org.apache.hadoop.mapred.OutputFormat baseOF =
+                        ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(), currTaskContext.getJobConf());
+                //check outputSpecs
+                baseOF.checkOutputSpecs(null,currTaskContext.getJobConf());
+                //get Output Committer
+                org.apache.hadoop.mapred.OutputCommitter baseOutputCommitter =  currTaskContext.getJobConf().getOutputCommitter();
+                //create currJobContext the latest so it gets all the config changes
+                org.apache.hadoop.mapred.JobContext currJobContext = HCatMapRedUtil.createJobContext(currTaskContext);
+                //setupJob()
+                baseOutputCommitter.setupJob(currJobContext);
+                //recreate to refresh jobConf of currTask context
+                currTaskContext =
+                        HCatMapRedUtil.createTaskAttemptContext(currJobContext.getJobConf(),
+                                                                                        currTaskContext.getTaskAttemptID(),
+                                                                                        currTaskContext.getProgressible());
+                //set temp location
+                currTaskContext.getConfiguration().set("mapred.work.output.dir",
+                                new FileOutputCommitter(new Path(localJobInfo.getLocation()),currTaskContext).getWorkPath().toString());
+                //setupTask()
+                baseOutputCommitter.setupTask(currTaskContext);
+
+                org.apache.hadoop.mapred.RecordWriter baseRecordWriter =
+                        baseOF.getRecordWriter(null,
+                                                            currTaskContext.getJobConf(),
+                                                            FileOutputFormat.getUniqueFile(currTaskContext, "part", ""),
+                                                            InternalUtil.createReporter(currTaskContext));
+
+                baseDynamicWriters.put(dynKey, baseRecordWriter);
+                baseDynamicSerDe.put(dynKey,currSerDe);
+                baseDynamicCommitters.put(dynKey,baseOutputCommitter);
+                dynamicContexts.put(dynKey,currTaskContext);
+                dynamicObjectInspectors.put(dynKey,InternalUtil.createStructObjectInspector(jobInfo.getOutputSchema()));
             }
-
-            localWriter = baseDynamicWriters.get(dynHashCode);
-            localDriver = baseDynamicStorageDrivers.get(dynHashCode);
-        }else{
+            localJobInfo = HCatOutputFormat.getJobInfo(dynamicContexts.get(dynKey));
+            localWriter = baseDynamicWriters.get(dynKey);
+            localSerDe = baseDynamicSerDe.get(dynKey);
+            localContext = dynamicContexts.get(dynKey);
+            localObjectInspector = dynamicObjectInspectors.get(dynKey);
+        }
+        else{
+            localJobInfo = HCatBaseOutputFormat.getJobInfo(context);
             localWriter = getBaseRecordWriter();
-            localDriver = storageDriver;
+            localSerDe = serDe;
+            localContext = HCatMapRedUtil.createTaskAttemptContext(context);
+            localObjectInspector = objectInspector;
         }
 
         for(Integer colToDel : partColsToDel){
             value.remove(colToDel);
         }
 
-        //The key given by user is ignored
-        WritableComparable<?> generatedKey = localDriver.generateKey(value);
-        Writable convertedValue = localDriver.convertValue(value);
-        localWriter.write(generatedKey, convertedValue);
-    }
 
-    protected HCatOutputStorageDriver createDynamicStorageDriver(List<String> dynamicPartVals) throws IOException {
-        HCatOutputStorageDriver localOsd = HCatOutputFormat.getOutputDriverInstance(context,jobInfo,dynamicPartVals);
-        return localOsd;
-    }
-
-    public void prepareForStorageDriverOutput(TaskAttemptContext context) throws IOException {
-        // Set permissions and group on freshly created files.
-        if (!dynamicPartitioningUsed){
-            HCatOutputStorageDriver localOsd = this.getStorageDriver();
-            prepareForStorageDriverOutput(localOsd,context);
+        //The key given by user is ignored
+        try {
+            localWriter.write(null, localSerDe.serialize(value.getAll(), localObjectInspector));
+        } catch (SerDeException e) {
+            throw new IOException("Failed to serialize object",e);
         }
     }
 
-    private void prepareForStorageDriverOutput(HCatOutputStorageDriver localOsd,
-                                               TaskAttemptContext context) throws IOException {
-        FileOutputStorageDriver.prepareOutputLocation(localOsd, context);
+    protected void configureDynamicStorageHandler(JobContext context, List<String> dynamicPartVals) throws IOException {
+        HCatOutputFormat.configureOutputStorageHandler(context, dynamicPartVals);
     }
 
 }

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java?rev=1244334&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java Wed Feb 15 03:53:50 2012
@@ -0,0 +1,157 @@
+package org.apache.hcatalog.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.io.RCFile;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *  This class is used to encapsulate the InputFormat, OutputFormat and SerDe
+ *  artifacts of tables which don't define a SerDe. This StorageHandler assumes
+ *  the supplied storage artifacts are for a file-based storage system.
+ */
+public class FosterStorageHandler extends HCatStorageHandler {
+
+    public Configuration conf
+            ;
+    /** The directory under which data is initially written for a partitioned table */
+    protected static final String DYNTEMP_DIR_NAME = "_DYN";
+
+    /** The directory under which data is initially written for a non partitioned table */
+    protected static final String TEMP_DIR_NAME = "_TEMP";
+
+   private Class<? extends InputFormat> ifClass;
+   private Class<? extends OutputFormat> ofClass;
+   private Class<? extends SerDe> serDeClass;
+
+    public FosterStorageHandler(String ifName, String ofName, String serdeName) throws ClassNotFoundException {
+        this((Class<? extends InputFormat>) Class.forName(ifName),
+                (Class<? extends OutputFormat>) Class.forName(ofName),
+                (Class<? extends SerDe>) Class.forName(serdeName));
+    }
+    public FosterStorageHandler(Class<? extends InputFormat> ifClass,
+                                               Class<? extends OutputFormat> ofClass,
+                                               Class<? extends SerDe> serDeClass) {
+        this.ifClass = ifClass;
+        this.ofClass = ofClass;
+        this.serDeClass = serDeClass;
+    }
+
+    @Override
+    public Class<? extends InputFormat> getInputFormatClass() {
+        return ifClass;    //To change body of overridden methods use File | Settings | File Templates.
+    }
+
+    @Override
+    public Class<? extends OutputFormat> getOutputFormatClass() {
+        return ofClass;    //To change body of overridden methods use File | Settings | File Templates.
+    }
+
+    @Override
+    public Class<? extends SerDe> getSerDeClass() {
+        return serDeClass;  //To change body of implemented methods use File | Settings | File Templates.
+    }
+
+    @Override
+    public HiveMetaHook getMetaHook() {
+        return null;
+    }
+
+    @Override
+    public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+
+    }
+
+    @Override
+    public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+        try {
+            OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(tableDesc.getJobProperties().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+            String parentPath = jobInfo.getTableInfo().getTableLocation();
+            String dynHash = tableDesc.getJobProperties().get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
+
+            // For dynamic partitioned writes without all keyvalues specified,
+            // we create a temp dir for the associated write job
+            if (dynHash != null){
+                parentPath = new Path(parentPath, DYNTEMP_DIR_NAME+dynHash).toString();
+            }
+
+            String outputLocation;
+
+            // For non-partitioned tables, we send them to the temp dir
+            if(dynHash == null && jobInfo.getPartitionValues().size() == 0) {
+                outputLocation = TEMP_DIR_NAME;
+            }
+            else {
+                List<String> cols = new ArrayList<String>();
+                List<String> values = new ArrayList<String>();
+
+                //sort the cols and vals
+                for(String name: jobInfo.getTableInfo().getPartitionColumns().getFieldNames()) {
+                    String value = jobInfo.getPartitionValues().get(name);
+                    int i=0;
+                    while(i <cols.size() && name.compareTo(cols.get(i)) > 0)
+                        i++;
+                    cols.add(i,name);
+                    values.add(i,value);
+                }
+                outputLocation = FileUtils.makePartName(cols, values);
+            }
+
+            jobInfo.setLocation(new Path(parentPath,outputLocation).toString());
+
+            //only set output dir if partition is fully materialized
+            if(jobInfo.getPartitionValues().size() == jobInfo.getTableInfo().getPartitionColumns().size()) {
+                jobProperties.put("mapred.output.dir", jobInfo.getLocation());
+            }
+
+            //TODO find a better home for this, RCFile specifc
+            jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR,
+                                      Integer.toOctalString(jobInfo.getOutputSchema().getFields().size()));
+            jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO,
+                                      HCatUtil.serialize(jobInfo));
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to set output path",e);
+        }
+
+    }
+
+    @Override
+    OutputFormatContainer getOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat outputFormat) {
+        return new FileOutputFormatContainer(outputFormat);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+    }
+
+    @Override
+    public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException {
+        return new DefaultHiveAuthorizationProvider();
+    }
+
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1244334&r1=1244333&r2=1244334&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Wed Feb 15 03:53:50 2012
@@ -23,14 +23,14 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
@@ -62,7 +62,7 @@ public abstract class HCatBaseOutputForm
   @Override
   public void checkOutputSpecs(JobContext context
                                         ) throws IOException, InterruptedException {
-      getOutputFormat(context).checkOutputSpecs(context);
+    getOutputFormat(context).checkOutputSpecs(context);
   }
 
   /**
@@ -73,8 +73,10 @@ public abstract class HCatBaseOutputForm
    */
   protected OutputFormat<WritableComparable<?>, HCatRecord> getOutputFormat(JobContext context) throws IOException {
       OutputJobInfo jobInfo = getJobInfo(context);
-      HCatOutputStorageDriver  driver = getOutputDriverInstance(context, jobInfo);
-      return driver.getOutputFormatContainer(driver.getOutputFormat());
+      HCatStorageHandler  storageHandler = HCatUtil.getStorageHandler(context.getConfiguration(), jobInfo.getTableInfo().getStorerInfo());
+      //why do we need this?
+      configureOutputStorageHandler(context);
+      return storageHandler.getOutputFormatContainer(ReflectionUtils.newInstance(storageHandler.getOutputFormatClass(),context.getConfiguration()));
   }
 
   /**
@@ -97,31 +99,27 @@ public abstract class HCatBaseOutputForm
   /**
    * Gets the output storage driver instance.
    * @param jobContext the job context
-   * @param jobInfo the output job info
    * @return the output driver instance
    * @throws IOException
    */
   @SuppressWarnings("unchecked")
-  static HCatOutputStorageDriver getOutputDriverInstance(
-          JobContext jobContext, OutputJobInfo jobInfo) throws IOException {
-    return getOutputDriverInstance(jobContext,jobInfo,(List<String>)null);
+  static void configureOutputStorageHandler(
+          JobContext jobContext) throws IOException {
+    configureOutputStorageHandler(jobContext,(List<String>)null);
   }
 
   /**
    * Gets the output storage driver instance, with allowing specification of missing dynamic partvals
    * @param jobContext the job context
-   * @param jobInfo the output job info
    * @return the output driver instance
    * @throws IOException
    */
   @SuppressWarnings("unchecked")
-  static HCatOutputStorageDriver getOutputDriverInstance(
-          JobContext jobContext, OutputJobInfo jobInfo, List<String> dynamicPartVals) throws IOException {
+  static void configureOutputStorageHandler(
+          JobContext jobContext, List<String> dynamicPartVals) throws IOException {
       try {
-          Class<? extends HCatOutputStorageDriver> driverClass =
-              (Class<? extends HCatOutputStorageDriver>)
-              Class.forName(jobInfo.getTableInfo().getStorerInfo().getOutputSDClass());
-          HCatOutputStorageDriver driver = driverClass.newInstance();
+          OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+          HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(jobContext.getConfiguration(),jobInfo.getTableInfo().getStorerInfo());
 
           Map<String, String> partitionValues = jobInfo.getPartitionValues();
           String location = jobInfo.getLocation();
@@ -139,28 +137,17 @@ public abstract class HCatBaseOutputForm
               partitionValues.put(dynamicPartKeys.get(i), dynamicPartVals.get(i));
             }
 
-            // re-home location, now that we know the rest of the partvals
-            Table table = jobInfo.getTableInfo().getTable();
-            
-            List<String> partitionCols = new ArrayList<String>();
-            for(FieldSchema schema : table.getPartitionKeys()) {
-              partitionCols.add(schema.getName());
-            }
-
-            location = driver.getOutputLocation(jobContext,
-                table.getSd().getLocation() , partitionCols,
-                partitionValues,jobContext.getConfiguration().get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
+//            // re-home location, now that we know the rest of the partvals
+//            Table table = jobInfo.getTableInfo().getTable();
+//
+//            List<String> partitionCols = new ArrayList<String>();
+//            for(FieldSchema schema : table.getPartitionKeys()) {
+//              partitionCols.add(schema.getName());
+//            }
+            jobInfo.setPartitionValues(partitionValues);
           }
 
-          //Initialize the storage driver
-          driver.setSchema(jobContext, jobInfo.getOutputSchema());
-          driver.setPartitionValues(jobContext, partitionValues);
-          driver.setOutputPath(jobContext, location);
-          
-          driver.initialize(jobContext, jobInfo.getTableInfo().getStorerInfo().getProperties());
-          
-//          HCatUtil.logMap(LOG,"Setting outputPath ["+location+"] for ",partitionValues);
-          return driver;
+          HCatUtil.configureOutputStorageHandler(storageHandler,jobContext,jobInfo);
       } catch(Exception e) {
         if (e instanceof HCatException){
           throw (HCatException)e;
@@ -179,18 +166,18 @@ public abstract class HCatBaseOutputForm
    * @throws IOException
    */
 
-  protected static HCatOutputStorageDriver getOutputDriverInstance(
+  protected static void configureOutputStorageHandler(
       JobContext context, OutputJobInfo jobInfo,
       Map<String, String> fullPartSpec) throws IOException {
     List<String> dynamicPartKeys = jobInfo.getDynamicPartitioningKeys();
     if ((dynamicPartKeys == null)||(dynamicPartKeys.isEmpty())){
-      return getOutputDriverInstance(context,jobInfo,(List<String>)null);
+      configureOutputStorageHandler(context, (List<String>) null);
     }else{
       List<String> dynKeyVals = new ArrayList<String>();
       for (String dynamicPartKey : dynamicPartKeys){
         dynKeyVals.add(fullPartSpec.get(dynamicPartKey));
       }
-      return getOutputDriverInstance(context,jobInfo,dynKeyVals);
+      configureOutputStorageHandler(context, dynKeyVals);
     }
   }
 
@@ -239,7 +226,7 @@ public abstract class HCatBaseOutputForm
   }
 
   static void cancelDelegationTokens(JobContext context, OutputJobInfo outputJobInfo) throws Exception {
-    HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(outputJobInfo.getServerUri(), context.getConfiguration());
+    HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, context.getConfiguration());
     // cancel the deleg. tokens that were acquired for this job now that
     // we are done - we should cancel if the tokens were acquired by
     // HCatOutputFormat and not if they were supplied by Oozie. In the latter

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java.broken
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java.broken?rev=1244334&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java.broken (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java.broken Wed Feb 15 03:53:50 2012
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+/** The InputFormat to use to read data from HCat */
+public class HCatEximInputFormat extends HCatBaseInputFormat {
+
+  /**
+   * Set the input to use for the Job. This queries the metadata file with
+   * the specified partition predicates, gets the matching partitions, puts
+   * the information in the conf object. The inputInfo object is updated with
+   * information needed in the client context
+   *
+   * @param job the job object
+   * @return two hcat schemas, for the table columns and the partition keys
+   * @throws IOException
+   *           the exception in communicating with the metadata server
+   */
+  public static List<HCatSchema> setInput(Job job,
+      String location,
+      Map<String, String> partitionFilter) throws IOException {
+    FileSystem fs;
+    try {
+      fs = FileSystem.get(new URI(location), job.getConfiguration());
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+    Path fromPath = new Path(location);
+    Path metadataPath = new Path(fromPath, "_metadata");
+    try {
+      Map.Entry<org.apache.hadoop.hive.metastore.api.Table, List<Partition>> tp = EximUtil
+      .readMetaData(fs, metadataPath);
+      org.apache.hadoop.hive.metastore.api.Table table = tp.getKey();
+      InputJobInfo inputInfo = InputJobInfo.create(table.getDbName(), table.getTableName(),null,null,null);
+      List<FieldSchema> partCols = table.getPartitionKeys();
+      List<PartInfo> partInfoList = null;
+      if (partCols.size() > 0) {
+        List<String> partColNames = new ArrayList<String>(partCols.size());
+        for (FieldSchema fsc : partCols) {
+          partColNames.add(fsc.getName());
+        }
+        List<Partition> partitions = tp.getValue();
+        partInfoList = filterPartitions(partitionFilter, partitions, table.getPartitionKeys());
+      } else {
+        partInfoList = new ArrayList<PartInfo>(1);
+        HCatSchema schema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getSd().getCols()));
+        Map<String,String> parameters = table.getParameters();
+        String inputStorageDriverClass = null;
+        if (parameters.containsKey(HCatConstants.HCAT_ISD_CLASS)){
+          inputStorageDriverClass = parameters.get(HCatConstants.HCAT_ISD_CLASS);
+        }else{
+          throw new IOException("No input storage driver classname found, cannot read partition");
+        }
+        Properties hcatProperties = new Properties();
+        for (String key : parameters.keySet()){
+          if (key.startsWith(InitializeInput.HCAT_KEY_PREFIX)){
+            hcatProperties.put(key, parameters.get(key));
+          }
+        }
+        PartInfo partInfo = new PartInfo(schema, inputStorageDriverClass,  location + "/data", hcatProperties);
+        partInfoList.add(partInfo);
+      }
+      inputInfo.setPartitions(partInfoList);
+      inputInfo.setTableInfo(HCatTableInfo.valueOf(table));
+      job.getConfiguration().set(
+          HCatConstants.HCAT_KEY_JOB_INFO,
+          HCatUtil.serialize(inputInfo));
+      List<HCatSchema> rv = new ArrayList<HCatSchema>(2);
+      rv.add(HCatSchemaUtils.getHCatSchema(table.getSd().getCols()));
+      rv.add(HCatSchemaUtils.getHCatSchema(partCols));
+      return rv;
+    } catch(SemanticException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private static List<PartInfo> filterPartitions(Map<String, String> partitionFilter,
+      List<Partition> partitions, List<FieldSchema> partCols) throws IOException {
+    List<PartInfo> partInfos = new LinkedList<PartInfo>();
+    for (Partition partition : partitions) {
+      boolean matches = true;
+      List<String> partVals = partition.getValues();
+      assert partCols.size() == partVals.size();
+      Map<String, String> partSpec = EximUtil.makePartSpec(partCols, partVals);
+      if (partitionFilter != null) {
+        for (Map.Entry<String, String> constraint : partitionFilter.entrySet()) {
+          String partVal = partSpec.get(constraint.getKey());
+          if ((partVal == null) || !partVal.equals(constraint.getValue())) {
+            matches = false;
+            break;
+          }
+        }
+      }
+      if (matches) {
+        PartInfo partInfo = InitializeInput.extractPartInfo(partition.getSd(),
+            partition.getParameters());
+        partInfo.setPartitionValues(partSpec);
+        partInfos.add(partInfo);
+      }
+    }
+    return partInfos;
+  }
+}



Mime
View raw message