incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1293495 [1/2] - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/common/ src/java/org/apache/hcatalog/data/ src/java/org/apache/hcatalog/mapreduce/ src/java/org/apache/hcatalog/pig/ src/test/org/apache/hcatalog/data/ src/test/...
Date Sat, 25 Feb 2012 00:45:14 GMT
Author: gates
Date: Sat Feb 25 00:45:13 2012
New Revision: 1293495

URL: http://svn.apache.org/viewvc?rev=1293495&view=rev
Log:
HCATALOG-239 Changes to HCatInputFormat to make it use SerDes instead of StorageDrivers

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/DataType.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/LazyHCatRecord.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestHCatRecordSerDe.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/TestLazyHCatRecord.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatHiveCompatibility.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Sat Feb 25 00:45:13 2012
@@ -21,6 +21,8 @@ Apache HCatalog Change Log
 Trunk (unreleased changes)
 
   INCOMPATIBLE CHANGES
+  HCAT-239. Changes to HCatInputFormat to make it use SerDes instead of StorageDrivers (vikram.dixit via gates)
+
   HCAT-245. StorageHandler authorization providers (enis via hashutosh) 
 
   HCAT-241. Changes to HCatRecord to support switch from StorageDriver to SerDe (khorgath)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/ErrorType.java Sat Feb 25 00:45:13 2012
@@ -26,6 +26,7 @@ public enum ErrorType {
     ERROR_DB_INIT                       (1000, "Error initializing database session"),
     ERROR_EXCEED_MAXPART                (1001, "Query result exceeded maximum number of partitions allowed"),
 
+    ERROR_SET_INPUT                    (1002, "Error setting input information"),
 
     /* HCat Output Format related errors 2000 - 2999 */
     ERROR_INVALID_TABLE                 (2000, "Table specified does not exist"),

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatUtil.java Sat Feb 25 00:45:13 2012
@@ -35,10 +35,12 @@ import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -70,13 +72,14 @@ import org.apache.hcatalog.data.schema.H
 import org.apache.hcatalog.mapreduce.FosterStorageHandler;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.HCatStorageHandler;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 import org.apache.hcatalog.mapreduce.StorerInfo;
 import org.apache.thrift.TException;
 
 public class HCatUtil {
 
-//     static final private Log LOG = LogFactory.getLog(HCatUtil.class);
+    static final private Log LOG = LogFactory.getLog(HCatUtil.class);
 
     public static boolean checkJobContextIfRunningFromBackend(JobContext j) {
         if (j.getConfiguration().get("mapred.task.id", "").equals("")) {
@@ -469,10 +472,10 @@ public class HCatUtil {
      */
     public static HCatStorageHandler getStorageHandler(Configuration conf, StorerInfo storerInfo) throws IOException {
         return getStorageHandler(conf,
-                                              storerInfo.getStorageHandlerClass(),
-                                              storerInfo.getSerdeClass(),
-                                              storerInfo.getIfClass(),
-                                              storerInfo.getOfClass());
+                                 storerInfo.getStorageHandlerClass(),
+                                 storerInfo.getSerdeClass(),
+                                 storerInfo.getIfClass(),
+                                 storerInfo.getOfClass());
     }
 
     /**
@@ -488,26 +491,29 @@ public class HCatUtil {
      * @throws IOException
      */
     public static HCatStorageHandler getStorageHandler(Configuration conf,
-                                                                                  String storageHandler,
-                                                                                  String serDe,
-                                                                                  String inputFormat,
-                                                                                  String outputFormat) throws IOException {
-
+                                                       String storageHandler,
+                                                       String serDe,
+                                                       String inputFormat,
+                                                       String outputFormat) 
+    throws IOException {
 
         if (storageHandler == null) {
             try {
                 return new FosterStorageHandler(inputFormat,
-                                                                  outputFormat,
-                                                                  serDe);
+                                                outputFormat,
+                                                serDe);
             } catch (ClassNotFoundException e) {
-                throw new IOException("Failed to load foster storage handler",e);
+                throw new IOException("Failed to load "
+                    + "foster storage handler",e);
             }
         }
 
         try {
-            Class<? extends HCatStorageHandler> handlerClass = (Class<? extends HCatStorageHandler>) Class
+            Class<? extends HCatStorageHandler> handlerClass = 
+                        (Class<? extends HCatStorageHandler>) Class
                     .forName(storageHandler, true, JavaUtils.getClassLoader());
-            return (HCatStorageHandler)ReflectionUtils.newInstance(handlerClass, conf);
+            return (HCatStorageHandler)ReflectionUtils.newInstance(
+                                                            handlerClass, conf);
         } catch (ClassNotFoundException e) {
             throw new IOException("Error in loading storage handler."
                     + e.getMessage(), e);
@@ -526,27 +532,45 @@ public class HCatUtil {
       }
     }
 
-    public static ObjectInspector getObjectInspector(String serdeClassName, 
-        Configuration conf, Properties tbl) throws Exception {
-      SerDe s = (SerDe) Class.forName(serdeClassName).newInstance();
-      s.initialize(conf, tbl);
-      return s.getObjectInspector();
-    }
-
-    public static ObjectInspector getHCatRecordObjectInspector(HCatSchema hsch) throws Exception{
-      HCatRecordSerDe hrsd = new HCatRecordSerDe();
-      hrsd.initialize(hsch);
-      return hrsd.getObjectInspector();
-    }
-
-    public static void configureOutputStorageHandler(HCatStorageHandler storageHandler,
-                                                                              JobContext context,
-                                                                              OutputJobInfo outputJobInfo) {
-        //TODO replace IgnoreKeyTextOutputFormat with a HiveOutputFormatWrapper in StorageHandler
+    public static Map<String, String>
+      getInputJobProperties(HCatStorageHandler storageHandler,
+                            InputJobInfo inputJobInfo) {
         TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(),
-                                                                   storageHandler.getInputFormatClass(),
-                                                                   IgnoreKeyTextOutputFormat.class,
-                                                                   outputJobInfo.getTableInfo().getStorerInfo().getProperties());
+                  storageHandler.getInputFormatClass(),
+                  storageHandler.getOutputFormatClass(),
+                  inputJobInfo.getTableInfo().getStorerInfo().getProperties());
+        if(tableDesc.getJobProperties() == null) {
+            tableDesc.setJobProperties(new HashMap<String, String>());
+        }
+
+        Map<String,String> jobProperties = new HashMap<String,String>();
+        try {
+            tableDesc.getJobProperties().put(
+                HCatConstants.HCAT_KEY_JOB_INFO, 
+                HCatUtil.serialize(inputJobInfo));
+
+            storageHandler.configureInputJobProperties(tableDesc,
+                                                       jobProperties);
+
+        } catch (IOException e) {
+            throw new IllegalStateException(
+                "Failed to configure StorageHandler",e);
+        }
+        
+        return jobProperties;
+    }
+
+
+    public static void 
+      configureOutputStorageHandler(HCatStorageHandler storageHandler,
+                                    JobContext context,
+                                    OutputJobInfo outputJobInfo) {
+        //TODO replace IgnoreKeyTextOutputFormat with a 
+        //HiveOutputFormatWrapper in StorageHandler
+        TableDesc tableDesc = new TableDesc(storageHandler.getSerDeClass(),
+                  storageHandler.getInputFormatClass(),
+                  IgnoreKeyTextOutputFormat.class,
+                  outputJobInfo.getTableInfo().getStorerInfo().getProperties());
         if(tableDesc.getJobProperties() == null)
             tableDesc.setJobProperties(new HashMap<String, String>());
         for (Map.Entry<String,String> el: context.getConfiguration()) {
@@ -555,15 +579,19 @@ public class HCatUtil {
 
         Map<String,String> jobProperties = new HashMap<String,String>();
         try {
-            tableDesc.getJobProperties().put(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
+            tableDesc.getJobProperties().put(
+                HCatConstants.HCAT_KEY_OUTPUT_INFO, 
+                HCatUtil.serialize(outputJobInfo));
 
-            storageHandler.configureOutputJobProperties(tableDesc,jobProperties);
+            storageHandler.configureOutputJobProperties(tableDesc,
+                                                        jobProperties);
 
             for(Map.Entry<String,String> el: jobProperties.entrySet()) {
                 context.getConfiguration().set(el.getKey(),el.getValue());
             }
         } catch (IOException e) {
-            throw new IllegalStateException("Failed to configure StorageHandler",e);
+            throw new IllegalStateException(
+                "Failed to configure StorageHandler",e);
         }
     }
 
@@ -579,4 +607,96 @@ public class HCatUtil {
         }
     }
 
+    //TODO remove url component, everything should be encapsulated in HiveConf
+    public static HiveMetaStoreClient createHiveClient(HiveConf hiveConf) 
+    throws MetaException {
+      return new HiveMetaStoreClient(hiveConf);
+    }
+
+
+    public static HiveConf getHiveConf(String url, Configuration conf) 
+      throws IOException {
+      HiveConf hiveConf = new HiveConf();
+
+      if( url != null ) {
+        //User specified a thrift url
+
+        hiveConf.set("hive.metastore.local", "false");
+        hiveConf.set(ConfVars.METASTOREURIS.varname, url);
+
+        String kerberosPrincipal = conf.get(
+                                   HCatConstants.HCAT_METASTORE_PRINCIPAL);
+        if (kerberosPrincipal == null){
+            kerberosPrincipal = conf.get(
+                                ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname);
+        }
+        if (kerberosPrincipal != null){
+            hiveConf.setBoolean(
+                    ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
+            hiveConf.set(
+                    ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, 
+                    kerberosPrincipal);
+        }
+      } else {
+        //Thrift url is null, copy the hive conf into 
+        //the job conf and restore it
+        //in the backend context
+
+        if( conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null ) {
+          conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, 
+                   HCatUtil.serialize(hiveConf.getAllProperties()));
+        } else {
+          //Copy configuration properties into the hive conf
+          Properties properties = (Properties) HCatUtil.deserialize(
+                                  conf.get(HCatConstants.HCAT_KEY_HIVE_CONF));
+
+          for(Map.Entry<Object, Object> prop : properties.entrySet() ) {
+            if( prop.getValue() instanceof String ) {
+              hiveConf.set((String) prop.getKey(), (String) prop.getValue());
+            } else if( prop.getValue() instanceof Integer ) {
+              hiveConf.setInt((String) prop.getKey(), 
+                              (Integer) prop.getValue());
+            } else if( prop.getValue() instanceof Boolean ) {
+              hiveConf.setBoolean((String) prop.getKey(), 
+                                  (Boolean) prop.getValue());
+            } else if( prop.getValue() instanceof Long ) {
+              hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue());
+            } else if( prop.getValue() instanceof Float ) {
+              hiveConf.setFloat((String) prop.getKey(), 
+                                (Float) prop.getValue());
+            }
+          }
+        }
+
+      }
+
+      if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+        hiveConf.set("hive.metastore.token.signature", 
+                     conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
+      }
+
+      return hiveConf;
+    }
+
+
+    public static JobConf getJobConfFromContext(JobContext jobContext) 
+    {
+      JobConf jobConf;
+      // we need to convert the jobContext into a jobConf
+      // 0.18 jobConf (Hive) vs 0.20+ jobContext (HCat)
+      // begin conversion..
+      jobConf = new JobConf(jobContext.getConfiguration());
+      // ..end of conversion
+
+      
+      return jobConf;
+    }
+
+    public static void copyJobPropertiesToJobConf(
+                    Map<String, String>jobProperties, JobConf jobConf)
+    {
+      for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
+        jobConf.set(entry.getKey(), entry.getValue());
+      }
+    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/DataType.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/DataType.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/DataType.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/DataType.java Sat Feb 25 00:45:13 2012
@@ -170,4 +170,4 @@ public abstract class DataType {
       return dt1 < dt2 ? -1 : 1;
     }
   }
-}
\ No newline at end of file
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/HCatRecordObjectInspectorFactory.java Sat Feb 25 00:45:13 2012
@@ -68,6 +68,7 @@ public class HCatRecordObjectInspectorFa
           fieldObjectInspectors.add(getStandardObjectInspectorFromTypeInfo(fieldTypeInfos.get(i)));
         }
         oi = new HCatRecordObjectInspector(fieldNames,fieldObjectInspectors);
+
         break;
       default: 
         // Hmm.. not good, 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/LazyHCatRecord.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/LazyHCatRecord.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/LazyHCatRecord.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/LazyHCatRecord.java Sat Feb 25 00:45:13 2012
@@ -5,6 +5,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,9 +29,12 @@ public class LazyHCatRecord extends HCat
 
   private Object o;
   private StructObjectInspector soi;
+  private Map<Integer, Object> partCols;
   
   @Override
   public Object get(int fieldNum) {
+    Object pc = partCols.get(fieldNum);
+    if (pc != null) return pc;
     try {
       StructField fref = soi.getAllStructFieldRefs().get(fieldNum);
       return HCatRecordSerDe.serializeField(
@@ -59,7 +63,7 @@ public class LazyHCatRecord extends HCat
 
   @Override
   public int size() {
-    return soi.getAllStructFieldRefs().size();
+    return soi.getAllStructFieldRefs().size() + partCols.size();
   }
 
   @Override
@@ -99,17 +103,20 @@ public class LazyHCatRecord extends HCat
     throw new UnsupportedOperationException("not allowed to run copy() on LazyHCatRecord");
   }
   
-  public LazyHCatRecord(Object o, ObjectInspector oi) throws Exception{
+  public LazyHCatRecord(Object o, ObjectInspector oi, 
+                        Map<Integer, Object> partCols) 
+  throws Exception {
 
     if (oi.getCategory() != Category.STRUCT) {
       throw new SerDeException(getClass().toString()
-          + " can only make a lazy hcat record from objects of struct types, but we got: "
+          + " can only make a lazy hcat record from objects of " + 
+          "struct types, but we got: "
           + oi.getTypeName());
     }
 
     this.soi = (StructObjectInspector)oi;
     this.o = o;
-
+    this.partCols = partCols;
   }
 
   @Override

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/DefaultOutputCommitterContainer.java Sat Feb 25 00:45:13 2012
@@ -20,6 +20,7 @@ package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.mapred.HCatMapRedUtil;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -27,6 +28,7 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
 
 /**
  * Part of the DefaultOutput*Container classes
@@ -88,7 +90,9 @@ class DefaultOutputCommitterContainer ex
 
         //Cancel HCat and JobTracker tokens
         try {
-            HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, context.getConfiguration());
+            HiveConf hiveConf = HCatUtil.getHiveConf(null, 
+                                                  context.getConfiguration());
+            HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf);
             String tokenStrForm = client.getTokenStrForm();
             if(tokenStrForm != null && context.getConfiguration().get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
               client.cancelDelegationToken(tokenStrForm);

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java Sat Feb 25 00:45:13 2012
@@ -161,11 +161,14 @@ class FileOutputCommitterContainer exten
         OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
 
         try {
-            HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, jobContext.getConfiguration());
+            HiveConf hiveConf = HCatUtil.getHiveConf(null, 
+                                                jobContext.getConfiguration());
+            HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf);
             // cancel the deleg. tokens that were acquired for this job now that
             // we are done - we should cancel if the tokens were acquired by
-            // HCatOutputFormat and not if they were supplied by Oozie. In the latter
-            // case the HCAT_KEY_TOKEN_SIGNATURE property in the conf will not be set
+            // HCatOutputFormat and not if they were supplied by Oozie.
+            // In the latter case the HCAT_KEY_TOKEN_SIGNATURE property in 
+            // the conf will not be set
             String tokenStrForm = client.getTokenStrForm();
             if(tokenStrForm != null && jobContext.getConfiguration().get
                     (HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
@@ -280,9 +283,10 @@ class FileOutputCommitterContainer exten
         List<Partition> partitionsAdded = new ArrayList<Partition>();
 
         try {
-            client = HCatOutputFormat.createHiveClient(null, conf);
+            HiveConf hiveConf = HCatUtil.getHiveConf(null, conf);
+            client = HCatUtil.createHiveClient(hiveConf);
 
-            StorerInfo storer = InitializeInput.extractStorerInfo(table.getSd(),table.getParameters());
+            StorerInfo storer = InternalUtil.extractStorerInfo(table.getSd(),table.getParameters());
 
             updateTableSchema(client, table, jobInfo.getOutputSchema());
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputFormatContainer.java Sat Feb 25 00:45:13 2012
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -91,9 +92,11 @@ class FileOutputFormatContainer extends 
     public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {
         OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(context);
         try {
+            HiveConf hiveConf = HCatUtil.getHiveConf(null, 
+                                              context.getConfiguration());
             handleDuplicatePublish(context,
                     jobInfo,
-                    HCatOutputFormat.createHiveClient(null,context.getConfiguration()),
+                    HCatUtil.createHiveClient(hiveConf),
                     jobInfo.getTableInfo().getTable());
         } catch (MetaException e) {
             throw new IOException(e);

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java Sat Feb 25 00:45:13 2012
@@ -20,6 +20,7 @@ import org.apache.hcatalog.common.HCatCo
 import org.apache.hcatalog.common.HCatUtil;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +30,8 @@ import java.util.Map;
  *  artifacts of tables which don't define a SerDe. This StorageHandler assumes
  *  the supplied storage artifacts are for a file-based storage system.
  */
-public class FosterStorageHandler extends HCatStorageHandler {
+public class FosterStorageHandler extends HCatStorageHandler 
+  implements Serializable {
 
     public Configuration conf
             ;
@@ -77,21 +79,27 @@ public class FosterStorageHandler extend
     }
 
     @Override
-    public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+    public void configureInputJobProperties(TableDesc tableDesc, 
+                                            Map<String, String> jobProperties) {
 
     }
 
     @Override
-    public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+    public void configureOutputJobProperties(TableDesc tableDesc, 
+                                      Map<String, String> jobProperties) {
         try {
-            OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(tableDesc.getJobProperties().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+            OutputJobInfo jobInfo = (OutputJobInfo)
+              HCatUtil.deserialize(tableDesc.getJobProperties().get(
+                                      HCatConstants.HCAT_KEY_OUTPUT_INFO));
             String parentPath = jobInfo.getTableInfo().getTableLocation();
-            String dynHash = tableDesc.getJobProperties().get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
+            String dynHash = tableDesc.getJobProperties().get(
+                                      HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
 
             // For dynamic partitioned writes without all keyvalues specified,
             // we create a temp dir for the associated write job
             if (dynHash != null){
-                parentPath = new Path(parentPath, DYNTEMP_DIR_NAME+dynHash).toString();
+                parentPath = new Path(parentPath, 
+                                      DYNTEMP_DIR_NAME+dynHash).toString();
             }
 
             String outputLocation;
@@ -105,7 +113,9 @@ public class FosterStorageHandler extend
                 List<String> values = new ArrayList<String>();
 
                 //sort the cols and vals
-                for(String name: jobInfo.getTableInfo().getPartitionColumns().getFieldNames()) {
+                for(String name: 
+                    jobInfo.getTableInfo().
+                    getPartitionColumns().getFieldNames()) {
                     String value = jobInfo.getPartitionValues().get(name);
                     int i=0;
                     while(i <cols.size() && name.compareTo(cols.get(i)) > 0)
@@ -119,15 +129,17 @@ public class FosterStorageHandler extend
             jobInfo.setLocation(new Path(parentPath,outputLocation).toString());
 
             //only set output dir if partition is fully materialized
-            if(jobInfo.getPartitionValues().size() == jobInfo.getTableInfo().getPartitionColumns().size()) {
+            if(jobInfo.getPartitionValues().size() 
+                == jobInfo.getTableInfo().getPartitionColumns().size()) {
                 jobProperties.put("mapred.output.dir", jobInfo.getLocation());
             }
 
             //TODO find a better home for this, RCFile specifc
             jobProperties.put(RCFile.COLUMN_NUMBER_CONF_STR,
-                                      Integer.toOctalString(jobInfo.getOutputSchema().getFields().size()));
+                              Integer.toOctalString(
+                                jobInfo.getOutputSchema().getFields().size()));
             jobProperties.put(HCatConstants.HCAT_KEY_OUTPUT_INFO,
-                                      HCatUtil.serialize(jobInfo));
+                              HCatUtil.serialize(jobInfo));
         } catch (IOException e) {
             throw new IllegalStateException("Failed to set output path",e);
         }
@@ -135,7 +147,8 @@ public class FosterStorageHandler extend
     }
 
     @Override
-    OutputFormatContainer getOutputFormatContainer(org.apache.hadoop.mapred.OutputFormat outputFormat) {
+    OutputFormatContainer getOutputFormatContainer(
+              org.apache.hadoop.mapred.OutputFormat outputFormat) {
         return new FileOutputFormatContainer(outputFormat);
     }
 
@@ -150,7 +163,8 @@ public class FosterStorageHandler extend
     }
 
     @Override
-    public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException {
+    public HiveAuthorizationProvider getAuthorizationProvider() 
+      throws HiveException {
         return new DefaultHiveAuthorizationProvider();
     }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java Sat Feb 25 00:45:13 2012
@@ -21,22 +21,49 @@ package org.apache.hcatalog.mapreduce;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Properties;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDe;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
 
-public abstract class HCatBaseInputFormat extends InputFormat<WritableComparable, HCatRecord> {
+public abstract class HCatBaseInputFormat 
+  extends InputFormat<WritableComparable, HCatRecord> {
   
   /**
    * get the schema for the HCatRecord data returned by HCatInputFormat.
@@ -44,8 +71,13 @@ public abstract class HCatBaseInputForma
    * @param context the jobContext
    * @throws IllegalArgumentException
    */
-  public static HCatSchema getOutputSchema(JobContext context) throws Exception {
-    String os = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+  private Class<? extends InputFormat> inputFileFormatClass;
+
+  // TODO needs to go in InitializeInput? as part of InputJobInfo
+  public static HCatSchema getOutputSchema(JobContext context) 
+    throws IOException {
+    String os = context.getConfiguration().get(
+                                HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
     if (os == null) {
       return getTableSchema(context);
     } else {
@@ -58,10 +90,19 @@ public abstract class HCatBaseInputForma
    * @param job the job object
    * @param hcatSchema the schema to use as the consolidated schema
    */
-  public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception {
-    job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema));
+  public static void setOutputSchema(Job job,HCatSchema hcatSchema) 
+    throws IOException {
+    job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, 
+                               HCatUtil.serialize(hcatSchema));
   }
 
+  private static 
+    org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>
+    getMapRedInputFormat (JobConf job, Class inputFormatClass) throws IOException {
+      return (
+          org.apache.hadoop.mapred.InputFormat<WritableComparable, Writable>) 
+        ReflectionUtils.newInstance(inputFormatClass, job);
+  }
 
   /**
    * Logically split the set of input files for the job. Returns the
@@ -91,34 +132,39 @@ public abstract class HCatBaseInputForma
       return splits;
     }
 
+    HCatStorageHandler storageHandler;
+    JobConf jobConf;
+    Configuration conf = jobContext.getConfiguration();
     //For each matching partition, call getSplits on the underlying InputFormat
     for(PartInfo partitionInfo : partitionInfoList) {
-      Job localJob = new Job(jobContext.getConfiguration());
-      HCatInputStorageDriver storageDriver;
-      try {
-        storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass());
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
+      jobConf = HCatUtil.getJobConfFromContext(jobContext);
+      setInputPath(jobConf, partitionInfo.getLocation());
+      Map<String,String> jobProperties = partitionInfo.getJobProperties();
 
       HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-      for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields())
+      for(HCatFieldSchema field: 
+          inputJobInfo.getTableInfo().getDataColumns().getFields())
           allCols.append(field);
-      for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields())
+      for(HCatFieldSchema field: 
+          inputJobInfo.getTableInfo().getPartitionColumns().getFields())
           allCols.append(field);
 
-      //Pass all required information to the storage driver
-      initStorageDriver(storageDriver, localJob, partitionInfo, allCols);
+      HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+
+      storageHandler = partitionInfo.getStorageHandler();
 
-      //Get the input format for the storage driver
-      InputFormat inputFormat =
-        storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties());
+      //Get the input format
+      Class inputFormatClass = storageHandler.getInputFormatClass();
+      org.apache.hadoop.mapred.InputFormat inputFormat = 
+                            getMapRedInputFormat(jobConf, inputFormatClass);
 
-      //Call getSplit on the storage drivers InputFormat, create an
+      //Call getSplit on the InputFormat, create an
       //HCatSplit for each underlying split
-      List<InputSplit> baseSplits = inputFormat.getSplits(localJob);
+      //NumSplits is 0 for our purposes
+      org.apache.hadoop.mapred.InputSplit[] baseSplits = 
+        inputFormat.getSplits(jobConf, 0);
 
-      for(InputSplit split : baseSplits) {
+      for(org.apache.hadoop.mapred.InputSplit split : baseSplits) {
         splits.add(new HCatSplit(
             partitionInfo,
             split,
@@ -141,36 +187,66 @@ public abstract class HCatBaseInputForma
    * @throws IOException or InterruptedException
    */
   @Override
-  public RecordReader<WritableComparable, HCatRecord> createRecordReader(InputSplit split,
+  public RecordReader<WritableComparable, HCatRecord> 
+  createRecordReader(InputSplit split,
       TaskAttemptContext taskContext) throws IOException, InterruptedException {
 
     HCatSplit hcatSplit = (HCatSplit) split;
     PartInfo partitionInfo = hcatSplit.getPartitionInfo();
+    JobContext jobContext = taskContext;
+
+    HCatStorageHandler storageHandler = partitionInfo.getStorageHandler();
+    JobConf jobConf = HCatUtil.getJobConfFromContext(jobContext);
 
-    //If running through a Pig job, the InputJobInfo will not be available in the
-    //backend process context (since HCatLoader works on a copy of the JobContext and does
-    //not call HCatInputFormat.setInput in the backend process).
-    //So this function should NOT attempt to read the InputJobInfo.
+    Class inputFormatClass = storageHandler.getInputFormatClass();
+    org.apache.hadoop.mapred.InputFormat inputFormat = 
+                              getMapRedInputFormat(jobConf, inputFormatClass);
+
+    Map<String, String> jobProperties = partitionInfo.getJobProperties();
+    HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
+    Reporter reporter = InternalUtil.createReporter(taskContext);
+    org.apache.hadoop.mapred.RecordReader recordReader =
+      inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, reporter);
 
-    HCatInputStorageDriver storageDriver;
+    SerDe serde;
     try {
-      storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass());
+      serde = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), 
+                                          jobContext.getConfiguration());
+
+//    HCatUtil.logEntrySet(LOG, "props to serde", properties.entrySet());
+
+      Configuration conf = storageHandler.getConf();
+      InternalUtil.initializeInputSerDe(serde, conf, 
+                                  partitionInfo.getTableInfo());
+                                  
     } catch (Exception e) {
-      throw new IOException(e);
+      throw new IOException("Unable to create objectInspector "
+          + "for serde class " + storageHandler.getSerDeClass().getName()
+          + e);
     }
 
-    //Pass all required information to the storage driver
-    initStorageDriver(storageDriver, taskContext, partitionInfo, hcatSplit.getTableSchema());
+    Map<Integer,Object> partCols = getPartColsByPosition(partitionInfo, 
+                                                        hcatSplit);
 
-    //Get the input format for the storage driver
-    InputFormat inputFormat =
-      storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties());
-
-    //Create the underlying input formats record record and an HCat wrapper
-    RecordReader recordReader =
-      inputFormat.createRecordReader(hcatSplit.getBaseSplit(), taskContext);
+    HCatRecordReader hcatRecordReader = new HCatRecordReader(storageHandler, 
+                                                             recordReader, 
+                                                             serde, 
+                                                             partCols);
+    return hcatRecordReader;
+  }
+
+  /** gets the partition columns that are not part of the Hive storage */
+  private static Map<Integer, Object> getPartColsByPosition(PartInfo partInfo, 
+                                                            HCatSplit split)
+  {
+    Map<Integer, Object> partCols = new HashMap<Integer, Object>();
+
+    for (String partitionKey : partInfo.getPartitionValues().keySet()) {
+      partCols.put(split.getSchema().getPosition(partitionKey), 
+                   partInfo.getPartitionValues().get(partitionKey));
+    }
 
-    return new HCatRecordReader(storageDriver,recordReader);
+    return partCols;
   }
 
   /**
@@ -179,14 +255,18 @@ public abstract class HCatBaseInputForma
    * has been called for a JobContext.
    * @param context the context
    * @return the table schema
-   * @throws Exception if HCatInputFromat.setInput has not been called for the current context
+   * @throws IOException if HCatInputFormat.setInput has not been called 
+   *                     for the current context
    */
-  public static HCatSchema getTableSchema(JobContext context) throws Exception {
+  public static HCatSchema getTableSchema(JobContext context) 
+  throws IOException {
     InputJobInfo inputJobInfo = getJobInfo(context);
       HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
-      for(HCatFieldSchema field: inputJobInfo.getTableInfo().getDataColumns().getFields())
+      for(HCatFieldSchema field: 
+          inputJobInfo.getTableInfo().getDataColumns().getFields())
           allCols.append(field);
-      for(HCatFieldSchema field: inputJobInfo.getTableInfo().getPartitionColumns().getFields())
+      for(HCatFieldSchema field: 
+          inputJobInfo.getTableInfo().getPartitionColumns().getFields())
           allCols.append(field);
     return allCols;
   }
@@ -197,73 +277,74 @@ public abstract class HCatBaseInputForma
    * exception since that means HCatInputFormat.setInput has not been called.
    * @param jobContext the job context
    * @return the InputJobInfo object
-   * @throws Exception the exception
+   * @throws IOException the exception
    */
-  private static InputJobInfo getJobInfo(JobContext jobContext) throws Exception {
-    String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+  private static InputJobInfo getJobInfo(JobContext jobContext) 
+    throws IOException {
+    String jobString = jobContext.getConfiguration().get(
+                                  HCatConstants.HCAT_KEY_JOB_INFO);
     if( jobString == null ) {
-      throw new Exception("job information not found in JobContext. HCatInputFormat.setInput() not called?");
+      throw new IOException("job information not found in JobContext."
+         + " HCatInputFormat.setInput() not called?");
     }
 
     return (InputJobInfo) HCatUtil.deserialize(jobString);
   }
 
+  private void setInputPath(JobConf jobConf, String location) 
+  throws IOException{
 
-  /**
-   * Initializes the storage driver instance. Passes on the required
-   * schema information, path info and arguments for the supported
-   * features to the storage driver.
-   * @param storageDriver the storage driver
-   * @param context the job context
-   * @param partitionInfo the partition info
-   * @param tableSchema the table level schema
-   * @throws IOException Signals that an I/O exception has occurred.
-   */
-  private void initStorageDriver(HCatInputStorageDriver storageDriver,
-      JobContext context, PartInfo partitionInfo,
-      HCatSchema tableSchema) throws IOException {
-
-    storageDriver.setInputPath(context, partitionInfo.getLocation());
-
-    if( partitionInfo.getPartitionSchema() != null ) {
-      storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema());
-    }
-
-    storageDriver.setPartitionValues(context, partitionInfo.getPartitionValues());
-
-    //Set the output schema. Use the schema given by user if set, otherwise use the
-    //table level schema
-    HCatSchema outputSchema = null;
-    String outputSchemaString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
-    if( outputSchemaString != null ) {
-      outputSchema = (HCatSchema) HCatUtil.deserialize(outputSchemaString);
-    } else {
-      outputSchema = tableSchema;
+    // ideally we should just call FileInputFormat.setInputPaths() here - but
+    // that won't work since FileInputFormat.setInputPaths() needs
+    // a Job object instead of a JobContext which we are handed here
+
+    int length = location.length();
+    int curlyOpen = 0;
+    int pathStart = 0;
+    boolean globPattern = false;
+    List<String> pathStrings = new ArrayList<String>();
+
+    for (int i=0; i<length; i++) {
+      char ch = location.charAt(i);
+      switch(ch) {
+      case '{' : {
+        curlyOpen++;
+        if (!globPattern) {
+          globPattern = true;
+        }
+        break;
+      }
+      case '}' : {
+        curlyOpen--;
+        if (curlyOpen == 0 && globPattern) {
+          globPattern = false;
+        }
+        break;
+      }
+      case ',' : {
+        if (!globPattern) {
+          pathStrings.add(location.substring(pathStart, i));
+          pathStart = i + 1 ;
+        }
+        break;
+      }
+      }
     }
+    pathStrings.add(location.substring(pathStart, length));
 
-    storageDriver.setOutputSchema(context, outputSchema);
+    Path[] paths = StringUtils.stringToPath(pathStrings.toArray(new String[0]));
 
-    storageDriver.initialize(context, partitionInfo.getInputStorageDriverProperties());
-  }
-
-  /**
-   * Gets the input driver instance.
-   * @param inputStorageDriverClass the input storage driver classname
-   * @return the input driver instance
-   * @throws Exception
-   */
-  @SuppressWarnings("unchecked")
-  private HCatInputStorageDriver getInputDriverInstance(
-      String inputStorageDriverClass) throws Exception {
-    try {
-      Class<? extends HCatInputStorageDriver> driverClass =
-        (Class<? extends HCatInputStorageDriver>)
-        Class.forName(inputStorageDriverClass);
-      return driverClass.newInstance();
-    } catch(Exception e) {
-      throw new Exception("error creating storage driver " +
-          inputStorageDriverClass, e);
+    FileSystem fs = FileSystem.get(jobConf);
+    Path path = paths[0].makeQualified(fs);
+    StringBuilder str = new StringBuilder(StringUtils.escapeString(
+                                                          path.toString()));
+    for(int i = 1; i < paths.length;i++) {
+      str.append(StringUtils.COMMA_STR);
+      path = paths[i].makeQualified(fs);
+      str.append(StringUtils.escapeString(path.toString()));
     }
+
+    jobConf.set("mapred.input.dir", str.toString());
   }
 
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Sat Feb 25 00:45:13 2012
@@ -26,6 +26,7 @@ import java.util.Map;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -226,7 +227,9 @@ public abstract class HCatBaseOutputForm
   }
 
   static void cancelDelegationTokens(JobContext context, OutputJobInfo outputJobInfo) throws Exception {
-    HiveMetaStoreClient client = HCatOutputFormat.createHiveClient(null, context.getConfiguration());
+    HiveConf hiveConf = HCatUtil.getHiveConf(null, 
+                                             context.getConfiguration());
+    HiveMetaStoreClient client = HCatUtil.createHiveClient(hiveConf);
     // cancel the deleg. tokens that were acquired for this job now that
     // we are done - we should cancel if the tokens were acquired by
     // HCatOutputFormat and not if they were supplied by Oozie. In the latter

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Sat Feb 25 00:45:13 2012
@@ -69,11 +69,13 @@ public class HCatOutputFormat extends HC
     @SuppressWarnings("unchecked")
     public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException {
       HiveMetaStoreClient client = null;
+      HiveConf hiveConf = null;
 
       try {
 
         Configuration conf = job.getConfiguration();
-        client = createHiveClient(null, conf);
+        hiveConf = HCatUtil.getHiveConf(null, conf);
+        client = HCatUtil.createHiveClient(hiveConf);
         Table table = client.getTable(outputJobInfo.getDatabaseName(), outputJobInfo.getTableName());
 
         if (table.getPartitionKeysSize() == 0 ){
@@ -141,7 +143,9 @@ public class HCatOutputFormat extends HC
         //Serialize the output info into the configuration
         outputJobInfo.setTableInfo(HCatTableInfo.valueOf(table));
         outputJobInfo.setOutputSchema(tableSchema);
+        harRequested = getHarRequested(hiveConf);
         outputJobInfo.setHarRequested(harRequested);
+        maxDynamicPartitions = getMaxDynamicPartitions(hiveConf);
         outputJobInfo.setMaximumDynamicPartitions(maxDynamicPartitions);
 
         HCatUtil.configureOutputStorageHandler(storageHandler,job,outputJobInfo);
@@ -222,76 +226,21 @@ public class HCatOutputFormat extends HC
         return getOutputFormat(context).getOutputCommitter(context);
     }
 
-    //TODO remove url component, everything should be encapsulated in HiveConf
-    static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException {
-      HiveConf hiveConf = getHiveConf(url, conf);
-//      HCatUtil.logHiveConf(LOG, hiveConf);
-      try {
-        return new HiveMetaStoreClient(hiveConf);
-      } catch (MetaException e) {
-        LOG.error("Error connecting to the metastore (conf follows): "+e.getMessage(), e);
-        HCatUtil.logHiveConf(LOG, hiveConf);
-        throw e;
-      }
-    }
-
-
-    static HiveConf getHiveConf(String url, Configuration conf) throws IOException {
-      HiveConf hiveConf = new HiveConf(HCatOutputFormat.class);
-
-      if( url != null ) {
-        //User specified a thrift url
-
-        hiveConf.set("hive.metastore.local", "false");
-        hiveConf.set(ConfVars.METASTOREURIS.varname, url);
-
-        String kerberosPrincipal = conf.get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
-        if (kerberosPrincipal == null){
-            kerberosPrincipal = conf.get(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname);
-        }
-        if (kerberosPrincipal != null){
-            hiveConf.setBoolean(ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
-            hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, kerberosPrincipal);
-        }
-      } else {
-        //Thrift url is null, copy the hive conf into the job conf and restore it
-        //in the backend context
-
-        if( conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null ) {
-          conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(hiveConf.getAllProperties()));
-        } else {
-          //Copy configuration properties into the hive conf
-          Properties properties = (Properties) HCatUtil.deserialize(conf.get(HCatConstants.HCAT_KEY_HIVE_CONF));
-
-          for(Map.Entry<Object, Object> prop : properties.entrySet() ) {
-            if( prop.getValue() instanceof String ) {
-              hiveConf.set((String) prop.getKey(), (String) prop.getValue());
-            } else if( prop.getValue() instanceof Integer ) {
-              hiveConf.setInt((String) prop.getKey(), (Integer) prop.getValue());
-            } else if( prop.getValue() instanceof Boolean ) {
-              hiveConf.setBoolean((String) prop.getKey(), (Boolean) prop.getValue());
-            } else if( prop.getValue() instanceof Long ) {
-              hiveConf.setLong((String) prop.getKey(), (Long) prop.getValue());
-            } else if( prop.getValue() instanceof Float ) {
-              hiveConf.setFloat((String) prop.getKey(), (Float) prop.getValue());
-            }
-          }
-        }
+    private static int getMaxDynamicPartitions(HiveConf hConf) {
+      // by default the bounds checking for maximum number of 
+      // dynamic partitions is disabled (-1)
+      int maxDynamicPartitions = -1;
 
-      }
-
-      if(conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
-        hiveConf.set("hive.metastore.token.signature", conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
-      }
-
-      // figure out what the maximum number of partitions allowed is, so we can pass it on to our outputinfo
       if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){
-        maxDynamicPartitions = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
-      }else{
-        maxDynamicPartitions = -1; // disables bounds checking for maximum number of dynamic partitions
+        maxDynamicPartitions = hConf.getIntVar(
+                                HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS);
       }
-      harRequested = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED);
-      return hiveConf;
+
+      return maxDynamicPartitions;
     }
 
+    private static boolean getHarRequested(HiveConf hConf) {
+      return hConf.getBoolVar(HiveConf.ConfVars.HIVEARCHIVEENABLED);
+    }
+   
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java Sat Feb 25 00:45:13 2012
@@ -18,71 +18,118 @@
 package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.SerDe;
+
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.LazyHCatRecord;
 
-/** The HCat wrapper for the underlying RecordReader, this ensures that the initialize on
- * the underlying record reader is done with the underlying split, not with HCatSplit.
+/** The HCat wrapper for the underlying RecordReader, 
+ * this ensures that the initialize on
+ * the underlying record reader is done with the underlying split, 
+ * not with HCatSplit.
  */
-class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> 
-  implements org.apache.hadoop.mapred.RecordReader {
+class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> {
   
     Log LOG = LogFactory.getLog(HCatRecordReader.class);
-    int lineCount = 0;
+    WritableComparable currentKey;
+    Writable currentValue;
 
     /** The underlying record reader to delegate to. */
-    private final RecordReader<? extends WritableComparable, ? extends Writable> baseRecordReader;
+    //org.apache.hadoop.mapred.
+    private final org.apache.hadoop.mapred.RecordReader
+      <WritableComparable, Writable> baseRecordReader;
+
+    /** The storage handler used */
+    private final HCatStorageHandler storageHandler;
 
-    /** The storage driver used */
-    private final HCatInputStorageDriver storageDriver;
+    private SerDe serde;
+
+    private Map<Integer,Object> partCols;
 
     /**
      * Instantiates a new hcat record reader.
      * @param baseRecordReader the base record reader
      */
-    public HCatRecordReader(HCatInputStorageDriver storageDriver, RecordReader<? extends WritableComparable, ? extends Writable> baseRecordReader) {
-        this.baseRecordReader = baseRecordReader;
-        this.storageDriver = storageDriver;
-    }
-
-    /* (non-Javadoc)
-     * @see org.apache.hadoop.mapreduce.RecordReader#initialize(org.apache.hadoop.mapreduce.InputSplit, org.apache.hadoop.mapreduce.TaskAttemptContext)
+    public HCatRecordReader(HCatStorageHandler storageHandler, 
+        org.apache.hadoop.mapred.RecordReader<WritableComparable, 
+                     Writable> baseRecordReader, 
+                     SerDe serde, 
+                     Map<Integer,Object> partCols) {
+      this.baseRecordReader = baseRecordReader;
+      this.storageHandler = storageHandler;
+      this.serde = serde;
+      this.partCols = partCols;
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.hadoop.mapreduce.RecordReader#initialize(
+     * org.apache.hadoop.mapreduce.InputSplit, 
+     * org.apache.hadoop.mapreduce.TaskAttemptContext)
      */
     @Override
-    public void initialize(InputSplit split, TaskAttemptContext taskContext)
+    public void initialize(org.apache.hadoop.mapreduce.InputSplit split, 
+                           TaskAttemptContext taskContext)
     throws IOException, InterruptedException {
-        InputSplit baseSplit = split;
+        org.apache.hadoop.mapred.InputSplit baseSplit;
 
         if( split instanceof HCatSplit ) {
             baseSplit = ((HCatSplit) split).getBaseSplit();
+        } else {
+          throw new IOException("Not a HCatSplit");
         }
 
-        baseRecordReader.initialize(baseSplit, taskContext);
+        Properties properties = new Properties();
+        for (Map.Entry<String, String>param : 
+            ((HCatSplit)split).getPartitionInfo()
+                              .getJobProperties().entrySet()) {
+          properties.setProperty(param.getKey(), param.getValue());
+        }
     }
 
     /* (non-Javadoc)
      * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
      */
     @Override
-    public WritableComparable getCurrentKey() throws IOException, InterruptedException {
-        return baseRecordReader.getCurrentKey();
+    public WritableComparable getCurrentKey() 
+    throws IOException, InterruptedException {
+      return currentKey;
     }
 
     /* (non-Javadoc)
      * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
      */
     @Override
-    public HCatRecord getCurrentValue() throws IOException, InterruptedException {
-        HCatRecord r = storageDriver.convertToHCatRecord(baseRecordReader.getCurrentKey(),baseRecordReader.getCurrentValue());
-        return r; 
+    public HCatRecord getCurrentValue() 
+    throws IOException, InterruptedException {
+      HCatRecord r;
+
+      try {
+        r = new DefaultHCatRecord((new LazyHCatRecord(
+                                            serde.deserialize(currentValue), 
+                               serde.getObjectInspector(), 
+                               partCols)).getAll());
+      } catch (Exception e) { 
+        throw new IOException("Failed to create HCatRecord " + e);
+      }
+      return r; 
     }
 
     /* (non-Javadoc)
@@ -95,9 +142,6 @@ class HCatRecordReader extends RecordRea
         } catch (IOException e) {
           LOG.warn(e.getMessage());
           LOG.warn(e.getStackTrace());
-        } catch (InterruptedException e) {
-          LOG.warn(e.getMessage());
-          LOG.warn(e.getStackTrace());
         }
         return 0.0f; // errored
     }
@@ -107,8 +151,13 @@ class HCatRecordReader extends RecordRea
      */
     @Override
     public boolean nextKeyValue() throws IOException, InterruptedException {
-        lineCount++;
-        return baseRecordReader.nextKeyValue();
+      if (currentKey == null) {
+        currentKey = baseRecordReader.createKey();
+        currentValue = baseRecordReader.createValue();
+      }
+
+        return baseRecordReader.next(currentKey, 
+                                     currentValue);
     }
 
     /* (non-Javadoc)
@@ -119,45 +168,4 @@ class HCatRecordReader extends RecordRea
         baseRecordReader.close();
     }
 
-    @Override
-    public Object createKey() {
-      WritableComparable o = null;
-      try {
-        o = getCurrentKey();
-      } catch (IOException e) {
-        LOG.warn(e.getMessage());
-        LOG.warn(e.getStackTrace());
-      } catch (InterruptedException e) {
-        LOG.warn(e.getMessage());
-        LOG.warn(e.getStackTrace());
-      }
-      return o;
-    }
-
-    @Override
-    public Object createValue() {
-      return new DefaultHCatRecord();
-    }
-
-    @Override
-    public long getPos() throws IOException {
-      return lineCount;
-    }
-
-    @Override
-    public boolean next(Object key, Object value) throws IOException {
-      try {
-        if (!nextKeyValue()){
-          return false;
-        }
-        
-        ((HCatRecord)value).copy(getCurrentValue());
-        
-        return true;
-      } catch (InterruptedException e) {
-        LOG.warn(e.getMessage());
-        LOG.warn(e.getStackTrace());
-      }
-      return false;
-    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatSplit.java Sat Feb 25 00:45:13 2012
@@ -22,16 +22,21 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
+
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.schema.HCatSchema;
 
 /** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */
-public class HCatSplit extends InputSplit implements Writable,org.apache.hadoop.mapred.InputSplit {
+public class HCatSplit extends InputSplit 
+  implements Writable,org.apache.hadoop.mapred.InputSplit {
 
     Log LOG = LogFactory.getLog(HCatSplit.class);
     
@@ -39,10 +44,13 @@ public class HCatSplit extends InputSpli
     private PartInfo partitionInfo;
 
     /** The split returned by the underlying InputFormat split. */
-    private InputSplit baseSplit;
+    private org.apache.hadoop.mapred.InputSplit baseMapRedSplit;
 
     /** The schema for the HCatTable */
     private HCatSchema tableSchema;
+
+    private HiveConf hiveConf;
+
     /**
      * Instantiates a new hcat split.
      */
@@ -53,13 +61,16 @@ public class HCatSplit extends InputSpli
      * Instantiates a new hcat split.
      *
      * @param partitionInfo the partition info
-     * @param baseSplit the base split
+     * @param baseMapRedSplit the base mapred split
      * @param tableSchema the table level schema
      */
-    public HCatSplit(PartInfo partitionInfo, InputSplit baseSplit, HCatSchema tableSchema) {
-        this.partitionInfo = partitionInfo;
-        this.baseSplit = baseSplit;
-        this.tableSchema = tableSchema;
+    public HCatSplit(PartInfo partitionInfo, 
+        org.apache.hadoop.mapred.InputSplit baseMapRedSplit, 
+        HCatSchema tableSchema) {
+
+      this.partitionInfo = partitionInfo;
+      this.baseMapRedSplit = baseMapRedSplit;
+      this.tableSchema = tableSchema;
     }
 
     /**
@@ -72,10 +83,10 @@ public class HCatSplit extends InputSpli
 
     /**
      * Gets the underlying InputSplit.
-     * @return the baseSplit
+     * @return the baseMapRedSplit
      */
-    public InputSplit getBaseSplit() {
-        return baseSplit;
+    public org.apache.hadoop.mapred.InputSplit getBaseSplit() {
+        return baseMapRedSplit;
     }
 
     /**
@@ -100,13 +111,10 @@ public class HCatSplit extends InputSpli
     @Override
     public long getLength() {
         try {
-          return baseSplit.getLength();
+          return baseMapRedSplit.getLength();
         } catch (IOException e) {
           LOG.warn(e.getMessage());
           LOG.warn(e.getStackTrace());
-        } catch (InterruptedException e) {
-          LOG.warn(e.getMessage());
-          LOG.warn(e.getStackTrace());
         }
         return 0; // we errored
     }
@@ -117,13 +125,10 @@ public class HCatSplit extends InputSpli
     @Override
     public String[] getLocations() {
         try {
-          return baseSplit.getLocations();
+          return baseMapRedSplit.getLocations();
         } catch (IOException e) {
           LOG.warn(e.getMessage());
           LOG.warn(e.getStackTrace());
-        } catch (InterruptedException e) {
-          LOG.warn(e.getMessage());
-          LOG.warn(e.getStackTrace());
         }
         return new String[0]; // we errored
     }
@@ -138,21 +143,22 @@ public class HCatSplit extends InputSpli
         partitionInfo = (PartInfo) HCatUtil.deserialize(partitionInfoString);
 
         String baseSplitClassName = WritableUtils.readString(input);
-        InputSplit split;
+        org.apache.hadoop.mapred.InputSplit split;
         try{
-            Class<? extends InputSplit> splitClass =
-                (Class<? extends InputSplit>) Class.forName(baseSplitClassName);
+            Class<? extends org.apache.hadoop.mapred.InputSplit> splitClass =
+                (Class<? extends org.apache.hadoop.mapred.InputSplit>) Class.forName(baseSplitClassName);
 
             //Class.forName().newInstance() does not work if the underlying
             //InputSplit has package visibility
-            Constructor<? extends InputSplit> constructor =
+            Constructor<? extends org.apache.hadoop.mapred.InputSplit> 
+              constructor =
                 splitClass.getDeclaredConstructor(new Class[]{});
             constructor.setAccessible(true);
 
             split = constructor.newInstance();
             // read baseSplit from input
             ((Writable)split).readFields(input);
-            this.baseSplit = split;
+            this.baseMapRedSplit = split;
         }catch(Exception e){
             throw new IOException ("Exception from " + baseSplitClassName, e);
         }
@@ -171,8 +177,8 @@ public class HCatSplit extends InputSpli
         // write partitionInfo into output
         WritableUtils.writeString(output, partitionInfoString);
 
-        WritableUtils.writeString(output, baseSplit.getClass().getName());
-        Writable baseSplitWritable = (Writable)baseSplit;
+        WritableUtils.writeString(output, baseMapRedSplit.getClass().getName());
+        Writable baseSplitWritable = (Writable)baseMapRedSplit;
         //write  baseSplit into output
         baseSplitWritable.write(output);
 
@@ -180,4 +186,8 @@ public class HCatSplit extends InputSpli
         String tableSchemaString = HCatUtil.serialize(tableSchema);
         WritableUtils.writeString(output, tableSchemaString);
     }
+
+    public HCatSchema getSchema() {
+      return tableSchema;
+    }
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java Sat Feb 25 00:45:13 2012
@@ -137,15 +137,17 @@ public class HCatTableInfo implements Se
    * @throws IOException
    */
   static HCatTableInfo valueOf(Table table) throws IOException {
-    HCatSchema dataColumns = HCatUtil.extractSchemaFromStorageDescriptor(table.getSd());
-    StorerInfo storerInfo = InternalUtil.extractStorerInfo(table.getSd(), table.getParameters());
+    HCatSchema dataColumns = 
+        HCatUtil.extractSchemaFromStorageDescriptor(table.getSd());
+    StorerInfo storerInfo = 
+        InternalUtil.extractStorerInfo(table.getSd(), table.getParameters());
     HCatSchema partitionColumns = HCatUtil.getPartitionColumns(table);
     return new HCatTableInfo(table.getDbName(),
-                                           table.getTableName(),
-                                           dataColumns,
-                                           partitionColumns,
-                                           storerInfo,
-                                           table);
+                             table.getTableName(),
+                             dataColumns,
+                             partitionColumns,
+                             storerInfo,
+                             table);
   }
 
   @Override

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Sat Feb 25 00:45:13 2012
@@ -27,14 +27,30 @@ import java.util.Properties;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.Deserializer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
+
 import org.apache.hcatalog.common.ErrorType;
 import org.apache.hcatalog.common.HCatConstants;
 import org.apache.hcatalog.common.HCatException;
@@ -91,16 +107,17 @@ public class InitializeInput {
         client = new HiveMetaStoreClient(hiveConf, null);
       }
       Table table = client.getTable(inputJobInfo.getDatabaseName(),
-                                                inputJobInfo.getTableName());
+                                    inputJobInfo.getTableName());
 
       List<PartInfo> partInfoList = new ArrayList<PartInfo>();
 
+      inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table));
       if( table.getPartitionKeys().size() != 0 ) {
         //Partitioned table
         List<Partition> parts = client.listPartitionsByFilter(inputJobInfo.getDatabaseName(),
-                                                                                 inputJobInfo.getTableName(),
-                                                                                 inputJobInfo.getFilter(),
-                                                                                 (short) -1);
+                                                              inputJobInfo.getTableName(),
+                                                              inputJobInfo.getFilter(),
+                                                              (short) -1);
 
         // Default to 100,000 partitions if hive.metastore.maxpartition is not defined
         int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
@@ -110,19 +127,22 @@ public class InitializeInput {
 
         // populate partition info
         for (Partition ptn : parts){
-          PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters());
-          partInfo.setPartitionValues(createPtnKeyValueMap(table,ptn));
+          PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters(), 
+                                              job.getConfiguration(), 
+                                              inputJobInfo);
+          partInfo.setPartitionValues(createPtnKeyValueMap(table, ptn));
           partInfoList.add(partInfo);
         }
 
       }else{
         //Non partitioned table
-        PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters());
+        PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters(),
+                                            job.getConfiguration(), 
+                                            inputJobInfo);
         partInfo.setPartitionValues(new HashMap<String,String>());
         partInfoList.add(partInfo);
       }
       inputJobInfo.setPartitions(partInfoList);
-      inputJobInfo.setTableInfo(HCatTableInfo.valueOf(table));
 
       return HCatUtil.serialize(inputJobInfo);
     } finally {
@@ -154,63 +174,31 @@ public class InitializeInput {
     return ptnKeyValues;
   }
 
-  static PartInfo extractPartInfo(StorageDescriptor sd, Map<String,String> parameters) throws IOException{
+  static PartInfo extractPartInfo(StorageDescriptor sd, 
+      Map<String,String> parameters, Configuration conf, 
+      InputJobInfo inputJobInfo) throws IOException{
     HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd);
-    String inputStorageDriverClass = null;
-    Properties hcatProperties = new Properties();
-    if (parameters.containsKey(HCatConstants.HCAT_ISD_CLASS)){
-      inputStorageDriverClass = parameters.get(HCatConstants.HCAT_ISD_CLASS);
-    }else{
-      // attempt to default to RCFile if the storage descriptor says it's an RCFile
-      if ((sd.getInputFormat() != null) && (sd.getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS))){
-        inputStorageDriverClass = HCatConstants.HCAT_RCFILE_ISD_CLASS;
-      }else{
-        throw new IOException("No input storage driver classname found, cannot read partition");
-      }
-    }
-    for (String key : parameters.keySet()){
-      if (key.startsWith(HCAT_KEY_PREFIX)){
-        hcatProperties.put(key, parameters.get(key));
-      }
-    }
-    return new PartInfo(schema,inputStorageDriverClass,  sd.getLocation(), hcatProperties);
-  }
-
+    StorerInfo storerInfo = InternalUtil.extractStorerInfo(sd,parameters);
 
+    Properties hcatProperties = new Properties();
+    HCatStorageHandler storageHandler = HCatUtil.getStorageHandler(conf, 
+                                                                   storerInfo);
 
-  static StorerInfo extractStorerInfo(StorageDescriptor sd, Map<String, String> properties) throws IOException {
-    String inputSDClass, outputSDClass;
-
-    if (properties.containsKey(HCatConstants.HCAT_ISD_CLASS)){
-      inputSDClass = properties.get(HCatConstants.HCAT_ISD_CLASS);
-    }else{
-      // attempt to default to RCFile if the storage descriptor says it's an RCFile
-      if ((sd.getInputFormat() != null) && (sd.getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS))){
-        inputSDClass = HCatConstants.HCAT_RCFILE_ISD_CLASS;
-      }else{
-        throw new IOException("No input storage driver classname found for table, cannot write partition");
-      }
-    }
+    // copy the properties from storageHandler to jobProperties
+    Map<String, String>jobProperties = HCatUtil.getInputJobProperties(
+                                                            storageHandler, 
+                                                            inputJobInfo);
 
-    if (properties.containsKey(HCatConstants.HCAT_OSD_CLASS)){
-      outputSDClass = properties.get(HCatConstants.HCAT_OSD_CLASS);
-    }else{
-      // attempt to default to RCFile if the storage descriptor says it's an RCFile
-      if ((sd.getOutputFormat() != null) && (sd.getOutputFormat().equals(HCatConstants.HIVE_RCFILE_OF_CLASS))){
-        outputSDClass = HCatConstants.HCAT_RCFILE_OSD_CLASS;
-      }else{
-        throw new IOException("No output storage driver classname found for table, cannot write partition");
-      }
-    }
-
-    Properties hcatProperties = new Properties();
-    for (String key : properties.keySet()){
+    for (String key : parameters.keySet()){
       if (key.startsWith(HCAT_KEY_PREFIX)){
-        hcatProperties.put(key, properties.get(key));
+        hcatProperties.put(key, parameters.get(key));
       }
     }
-
-    return new StorerInfo(inputSDClass, outputSDClass, hcatProperties);
+    // FIXME 
+    // Bloating partinfo with inputJobInfo is not good
+    return new PartInfo(schema, storageHandler,
+                        sd.getLocation(), hcatProperties,
+                        jobProperties, inputJobInfo.getTableInfo());
   }
 
     static HiveConf getHiveConf(InputJobInfo iInfo, Configuration conf)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InputJobInfo.java Sat Feb 25 00:45:13 2012
@@ -56,6 +56,9 @@ public class InputJobInfo implements Ser
   /** implementation specific job properties */
   private Properties properties;
 
+  /** job properties */
+  private Map<String,String> jobProperties;
+
   /**
    * Initializes a new InputJobInfo
    * for reading data from a table.
@@ -70,19 +73,21 @@ public class InputJobInfo implements Ser
    * The special string _HOST will be replaced automatically with the correct host name
    */
   public static InputJobInfo create(String databaseName,
-                                                  String tableName,
-                                                  String filter,
-                                                  String serverUri,
-                                                  String serverKerberosPrincipal) {
-    return new InputJobInfo(databaseName,tableName,filter,serverUri,serverKerberosPrincipal);
+                                    String tableName,
+                                    String filter,
+                                    String serverUri,
+                                    String serverKerberosPrincipal) {
+    return new InputJobInfo(databaseName, tableName, filter, 
+                            serverUri, serverKerberosPrincipal);
   }
 
   private InputJobInfo(String databaseName,
-                                String tableName,
-                                String filter,
-                                String serverUri,
-                                String serverKerberosPrincipal) {
-    this.databaseName = (databaseName == null) ? MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
+                       String tableName,
+                       String filter,
+                       String serverUri,
+                       String serverKerberosPrincipal) {
+    this.databaseName = (databaseName == null) ? 
+                        MetaStoreUtils.DEFAULT_DATABASE_NAME : databaseName;
     this.tableName = tableName;
     this.serverUri = serverUri;
     this.serverKerberosPrincipal = serverKerberosPrincipal;
@@ -169,5 +174,4 @@ public class InputJobInfo implements Ser
   public Properties getProperties() {
     return properties;
   }
-
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java?rev=1293495&r1=1293494&r2=1293495&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InternalUtil.java Sat Feb 25 00:45:13 2012
@@ -34,25 +34,19 @@ import java.util.Properties;
 class InternalUtil {
 
     static StorerInfo extractStorerInfo(StorageDescriptor sd, Map<String, String> properties) throws IOException {
-        String inputSDClass, outputSDClass;
-
-        if (properties.containsKey(HCatConstants.HCAT_ISD_CLASS)){
-          inputSDClass = properties.get(HCatConstants.HCAT_ISD_CLASS);
-        }else{
-          // attempt to default to RCFile if the storage descriptor says it's an RCFile
-          if ((sd.getInputFormat() != null) && (sd.getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS))){
-            inputSDClass = HCatConstants.HCAT_RCFILE_ISD_CLASS;
-          }else{
-            throw new IOException("No input storage driver classname found for table, cannot write partition");
-          }
-        }
-
         Properties hcatProperties = new Properties();
         for (String key : properties.keySet()){
             hcatProperties.put(key, properties.get(key));
         }
+        
+        // also populate with StorageDescriptor->SerDe.Parameters
+        for (Map.Entry<String, String>param :
+            sd.getSerdeInfo().getParameters().entrySet()) {
+          hcatProperties.put(param.getKey(), param.getValue());
+        }
+
 
-        return new StorerInfo(inputSDClass, null,
+        return new StorerInfo(null, null,
                 sd.getInputFormat(), sd.getOutputFormat(), sd.getSerdeInfo().getSerializationLib(),
                 properties.get(org.apache.hadoop.hive.metastore.api.Constants.META_TABLE_STORAGE),
                 hcatProperties);
@@ -120,9 +114,24 @@ class InternalUtil {
 
   //TODO this has to find a better home, it's also hardcoded as default in hive would be nice
   // if the default was decided by the serde
-  static void initializeOutputSerDe(SerDe serDe, Configuration conf, OutputJobInfo jobInfo) throws SerDeException {
+  static void initializeOutputSerDe(SerDe serDe, Configuration conf, 
+                                    OutputJobInfo jobInfo) 
+  throws SerDeException {
+    initializeSerDe(serDe, conf, jobInfo.getTableInfo(), 
+                    jobInfo.getOutputSchema()); 
+  }
+
+  static void initializeInputSerDe(SerDe serDe, Configuration conf, 
+                                   HCatTableInfo info)
+  throws SerDeException {
+    initializeSerDe(serDe, conf, info, info.getDataColumns()); 
+  }
+
+  static void initializeSerDe(SerDe serDe, Configuration conf, 
+                              HCatTableInfo info, HCatSchema s)
+  throws SerDeException {
      Properties props = new Properties();
-    List<FieldSchema> fields = HCatUtil.getFieldSchemaList(jobInfo.getOutputSchema().getFields());
+    List<FieldSchema> fields = HCatUtil.getFieldSchemaList(s.getFields());
     props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMNS,
           MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
     props.setProperty(org.apache.hadoop.hive.serde.Constants.LIST_COLUMN_TYPES,
@@ -133,14 +142,12 @@ class InternalUtil {
     props.setProperty(org.apache.hadoop.hive.serde.Constants.SERIALIZATION_FORMAT, "1");
 
     //add props from params set in table schema
-    props.putAll(jobInfo.getTableInfo().getStorerInfo().getProperties());
-    //add props from job properties
-    props.putAll(jobInfo.getProperties());
+    props.putAll(info.getStorerInfo().getProperties());
 
     serDe.initialize(conf,props);
   }
 
-  static Reporter createReporter(TaskAttemptContext context) {
+static Reporter createReporter(TaskAttemptContext context) {
       return new ProgressReporter(context);
   }
 



Mime
View raw message