incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1242773 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/data/ src/java/org/apache/hcatalog/data/schema/ src/java/org/apache/hcatalog/pig/ src/java/org/apache/hcatalog/rcfile/ src/java/org/apache/hcatalog/shims/ src/test/org...
Date Fri, 10 Feb 2012 13:59:59 GMT
Author: hashutosh
Date: Fri Feb 10 13:59:58 2012
New Revision: 1242773

URL: http://svn.apache.org/viewvc?rev=1242773&view=rev
Log:
HCAT-2. Support nested schema conversion between Hive an Pig (julienledem via hashutosh)

Added:
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MockLoader.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
Removed:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/HCatArrayBag.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1242773&r1=1242772&r2=1242773&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Fri Feb 10 13:59:58 2012
@@ -23,12 +23,15 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-2 Support nested schema conversion between Hive an Pig (julienledem via hashutosh)
 
   IMPROVEMENTS
-
+  HCAT-194. Better error messages for HCatalog access control errors (julienledem via hashutosh)  
+  
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-254. Fix storage-drives build.xml (hashutosh via khorgath)
 
 Release 0.3.0 - Unreleased
 
@@ -94,7 +97,7 @@ Release 0.3.0 - Unreleased
 
   HCAT-179. Make HCatalog compile against Hadoop 0.23 (thw via khorgath)
 
-  HCAT-194. Better error messages for HCatalog access control errors (julienledem via hashutosh)  
+
 
   HCAT-184. Optionally do not generate forrest docs (traviscrawford via hashutosh)
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java?rev=1242773&r1=1242772&r2=1242773&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatFieldSchema.java Fri Feb 10 13:59:58 2012
@@ -122,7 +122,7 @@ public class HCatFieldSchema implements 
      * @throws HCatException if call made on non-primitive types
      */
     public HCatFieldSchema(String fieldName, Type type, String comment) throws HCatException {
-        assertTypeInCategory(type,Category.PRIMITIVE);
+        assertTypeInCategory(type,Category.PRIMITIVE,fieldName);
         this.fieldName = fieldName;
         this.type = type;
         this.category = Category.PRIMITIVE;
@@ -162,8 +162,8 @@ public class HCatFieldSchema implements 
      * @throws HCatException if call made on non-Map types
      */
     public HCatFieldSchema(String fieldName, Type type, Type mapKeyType, HCatSchema mapValueSchema, String comment) throws HCatException{
-        assertTypeInCategory(type,Category.MAP);
-        assertTypeInCategory(mapKeyType,Category.PRIMITIVE);
+        assertTypeInCategory(type,Category.MAP, fieldName);
+        assertTypeInCategory(mapKeyType,Category.PRIMITIVE, fieldName);
         this.fieldName = fieldName;
         this.type = Type.MAP;
         this.category = Category.MAP;
@@ -174,29 +174,29 @@ public class HCatFieldSchema implements 
     }
 
     public HCatSchema getStructSubSchema() throws HCatException {
-        assertTypeInCategory(this.type,Category.STRUCT);
+        assertTypeInCategory(this.type,Category.STRUCT, this.fieldName);
         return subSchema;
     }
 
     public HCatSchema getArrayElementSchema() throws HCatException {
-        assertTypeInCategory(this.type,Category.ARRAY);
+        assertTypeInCategory(this.type,Category.ARRAY, this.fieldName);
         return subSchema;
     }
 
     public Type getMapKeyType() throws HCatException {
-        assertTypeInCategory(this.type,Category.MAP);
+        assertTypeInCategory(this.type,Category.MAP, this.fieldName);
         return mapKeyType;
     }
 
     public HCatSchema getMapValueSchema() throws HCatException {
-        assertTypeInCategory(this.type,Category.MAP);
+        assertTypeInCategory(this.type,Category.MAP, this.fieldName);
         return subSchema;
     }
 
-    private static void assertTypeInCategory(Type type, Category category) throws HCatException {
+    private static void assertTypeInCategory(Type type, Category category, String fieldName) throws HCatException {
         Category typeCategory = Category.fromType(type);
         if (typeCategory != category){
-            throw new HCatException("Type category mismatch. Expected "+category+" but type "+type+" in category "+typeCategory);
+            throw new HCatException("Type category mismatch. Expected "+category+" but type "+type+" in category "+typeCategory+ " (field "+fieldName+")");
         }
     }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java?rev=1242773&r1=1242772&r2=1242773&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/data/schema/HCatSchemaUtils.java Fri Feb 10 13:59:58 2012
@@ -18,6 +18,7 @@
 package org.apache.hcatalog.data.schema;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
@@ -35,26 +36,24 @@ import org.apache.hcatalog.data.schema.H
 
 public class HCatSchemaUtils {
 
-    private static HCatSchemaUtils ref = new HCatSchemaUtils();
-
     public static CollectionBuilder getStructSchemaBuilder(){
-        return ref.new CollectionBuilder();
+        return new CollectionBuilder();
     }
 
     public static CollectionBuilder getListSchemaBuilder(){
-        return ref.new CollectionBuilder();
+        return new CollectionBuilder();
     }
 
     public static MapBuilder getMapSchemaBuilder(){
-        return ref.new MapBuilder();
+        return new MapBuilder();
     }
 
 
-    public abstract class HCatSchemaBuilder {
+    public static abstract class HCatSchemaBuilder {
         public abstract HCatSchema build() throws HCatException;
     }
 
-    public class CollectionBuilder extends HCatSchemaBuilder { // for STRUCTS(multiple-add-calls) and LISTS(single-add-call)
+    public static class CollectionBuilder extends HCatSchemaBuilder { // for STRUCTS(multiple-add-calls) and LISTS(single-add-call)
         List<HCatFieldSchema> fieldSchemas = null;
 
         CollectionBuilder(){
@@ -77,7 +76,7 @@ public class HCatSchemaUtils {
 
     }
 
-    public class MapBuilder extends HCatSchemaBuilder {
+    public static class MapBuilder extends HCatSchemaBuilder {
 
         Type keyType = null;
         HCatSchema valueSchema = null;
@@ -116,21 +115,23 @@ public class HCatSchemaUtils {
 
     private static HCatFieldSchema getHCatFieldSchema(String fieldName, TypeInfo fieldTypeInfo) throws HCatException {
         Category typeCategory = fieldTypeInfo.getCategory();
+        HCatFieldSchema hCatFieldSchema;
         if (Category.PRIMITIVE == typeCategory){
-            return new HCatFieldSchema(fieldName,getPrimitiveHType(fieldTypeInfo),null);
+            hCatFieldSchema = new HCatFieldSchema(fieldName,getPrimitiveHType(fieldTypeInfo),null);
         } else if (Category.STRUCT == typeCategory) {
             HCatSchema subSchema = constructHCatSchema((StructTypeInfo)fieldTypeInfo);
-            return new HCatFieldSchema(fieldName,HCatFieldSchema.Type.STRUCT,subSchema,null);
+            hCatFieldSchema = new HCatFieldSchema(fieldName,HCatFieldSchema.Type.STRUCT,subSchema,null);
         } else if (Category.LIST == typeCategory) {
             HCatSchema subSchema = getHCatSchema(((ListTypeInfo)fieldTypeInfo).getListElementTypeInfo());
-            return new HCatFieldSchema(fieldName,HCatFieldSchema.Type.ARRAY,subSchema,null);
+            hCatFieldSchema = new HCatFieldSchema(fieldName,HCatFieldSchema.Type.ARRAY,subSchema,null);
         } else if (Category.MAP == typeCategory) {
             HCatFieldSchema.Type mapKeyType =  getPrimitiveHType(((MapTypeInfo)fieldTypeInfo).getMapKeyTypeInfo());
             HCatSchema subSchema = getHCatSchema(((MapTypeInfo)fieldTypeInfo).getMapValueTypeInfo());
-            return new HCatFieldSchema(fieldName,HCatFieldSchema.Type.MAP,mapKeyType,subSchema,null);
+            hCatFieldSchema = new HCatFieldSchema(fieldName,HCatFieldSchema.Type.MAP,mapKeyType,subSchema,null);
         } else{
             throw new TypeNotPresentException(fieldTypeInfo.getTypeName(),null);
         }
+        return hCatFieldSchema;
     }
 
     private static Type getPrimitiveHType(TypeInfo basePrimitiveTypeInfo) {
@@ -180,23 +181,25 @@ public class HCatSchemaUtils {
 
     public static HCatSchema getHCatSchema(TypeInfo typeInfo) throws HCatException {
         Category typeCategory = typeInfo.getCategory();
+        HCatSchema hCatSchema;
         if (Category.PRIMITIVE == typeCategory){
-            return getStructSchemaBuilder().addField(new HCatFieldSchema(null,getPrimitiveHType(typeInfo),null)).build();
+            hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null,getPrimitiveHType(typeInfo),null)).build();
         } else if (Category.STRUCT == typeCategory) {
             HCatSchema subSchema = constructHCatSchema((StructTypeInfo) typeInfo);
-            return getStructSchemaBuilder().addField(new HCatFieldSchema(null,Type.STRUCT,subSchema,null)).build();
+            hCatSchema = getStructSchemaBuilder().addField(new HCatFieldSchema(null,Type.STRUCT,subSchema,null)).build();
         } else if (Category.LIST == typeCategory) {
-            CollectionBuilder builder = getStructSchemaBuilder();
+            CollectionBuilder builder = getListSchemaBuilder();
             builder.addField(getHCatFieldSchema(null,((ListTypeInfo)typeInfo).getListElementTypeInfo()));
-            return builder.build();
+            hCatSchema = new HCatSchema(Arrays.asList(new HCatFieldSchema("",Type.ARRAY, builder.build(), "")));
         } else if (Category.MAP == typeCategory) {
             HCatFieldSchema.Type mapKeyType =  getPrimitiveHType(((MapTypeInfo)typeInfo).getMapKeyTypeInfo());
             HCatSchema subSchema = getHCatSchema(((MapTypeInfo)typeInfo).getMapValueTypeInfo());
             MapBuilder builder = getMapSchemaBuilder();
-            return builder.withKeyType(mapKeyType).withValueSchema(subSchema).build();
+            hCatSchema = builder.withKeyType(mapKeyType).withValueSchema(subSchema).build();
         } else{
             throw new TypeNotPresentException(typeInfo.getTypeName(),null);
         }
+        return hCatSchema;
     }
 
     public static HCatSchema getHCatSchemaFromTypeString(String typeString) throws HCatException {

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java?rev=1242773&r1=1242772&r2=1242773&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java Fri Feb 10 13:59:58 2012
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
@@ -81,7 +82,7 @@ public abstract class HCatBaseStorer ext
       for(String partKVP : partKVPs){
         String[] partKV = partKVP.split("=");
         if(partKV.length == 2) {
-          String partKey = partKV[0].trim(); 
+          String partKey = partKV[0].trim();
           partitionKeys.add(partKey);
           partitions.put(partKey, partKV[1].trim());
         } else {
@@ -118,58 +119,33 @@ public abstract class HCatBaseStorer ext
    * schema of the table in metastore.
    */
   protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException{
-
     List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
     for(FieldSchema fSchema : pigSchema.getFields()){
-      byte type = fSchema.type;
-      HCatFieldSchema hcatFSchema;
-
       try {
+        HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema);
 
-        // Find out if we need to throw away the tuple or not.
-        if(type == DataType.BAG && removeTupleFromBag(tableSchema, fSchema)){
-          List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
-          arrFields.add(getHCatFSFromPigFS(fSchema.schema.getField(0).schema.getField(0), tableSchema));
-          hcatFSchema = new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), null);
-      }
-      else{
-          hcatFSchema = getHCatFSFromPigFS(fSchema, tableSchema);
-      }
-      fieldSchemas.add(hcatFSchema);
+        fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema));
       } catch (HCatException he){
           throw new FrontendException(he.getMessage(),PigHCatUtil.PIG_EXCEPTION_CODE,he);
       }
     }
-
     return new HCatSchema(fieldSchemas);
   }
 
-  private void validateUnNested(Schema innerSchema) throws FrontendException{
-
-    for(FieldSchema innerField : innerSchema.getFields()){
-      validateAlias(innerField.alias);
-      if(DataType.isComplex(innerField.type)) {
-        throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE);
-      }
-    }
-  }
-
-  private boolean removeTupleFromBag(HCatSchema tableSchema, FieldSchema bagFieldSchema) throws HCatException{
-
-    String colName = bagFieldSchema.alias;
-    for(HCatFieldSchema field : tableSchema.getFields()){
-      if(colName.equalsIgnoreCase(field.getName())){
-        return (field.getArrayElementSchema().get(0).getType() == Type.STRUCT) ? false : true;
-      }
+  public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException{
+    if (hcatFieldSchema != null && hcatFieldSchema.getArrayElementSchema().get(0).getType() != Type.STRUCT) {
+      return true;
     }
     // Column was not found in table schema. Its a new column
     List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
-    return (tupSchema.size() == 1 && tupSchema.get(0).schema == null) ? true : false;
+    if (hcatFieldSchema == null && tupSchema.size() == 1 && (tupSchema.get(0).schema == null || (tupSchema.get(0).type == DataType.TUPLE && tupSchema.get(0).schema.size() == 1))) {
+      return true;
+    }
+    return false;
   }
 
 
-  private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatSchema hcatTblSchema) throws FrontendException, HCatException{
-
+  private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException{
     byte type = fSchema.type;
     switch(type){
 
@@ -191,19 +167,29 @@ public abstract class HCatBaseStorer ext
 
     case DataType.BYTEARRAY:
     	return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
-    	
+
     case DataType.BAG:
       Schema bagSchema = fSchema.schema;
       List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
-      arrFields.add(getHCatFSFromPigFS(bagSchema.getField(0), hcatTblSchema));
+      FieldSchema field;
+      // Find out if we need to throw away the tuple or not.
+      if (removeTupleFromBag(hcatFieldSchema, fSchema)) {
+        field = bagSchema.getField(0).schema.getField(0);
+      } else {
+        field = bagSchema.getField(0);
+      }
+      arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0)));
       return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
 
     case DataType.TUPLE:
       List<String> fieldNames = new ArrayList<String>();
       List<HCatFieldSchema> hcatFSs = new ArrayList<HCatFieldSchema>();
-      for( FieldSchema fieldSchema : fSchema.schema.getFields()){
-        fieldNames.add( fieldSchema.alias);
-        hcatFSs.add(getHCatFSFromPigFS(fieldSchema, hcatTblSchema));
+      HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema();
+      List<FieldSchema> fields = fSchema.schema.getFields();
+      for (int i = 0; i < fields.size(); i++) {
+        FieldSchema fieldSchema = fields.get(i);
+        fieldNames.add(fieldSchema.alias);
+        hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i)));
       }
       return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), "");
 
@@ -211,27 +197,12 @@ public abstract class HCatBaseStorer ext
       // Pig's schema contain no type information about map's keys and
       // values. So, if its a new column assume <string,string> if its existing
       // return whatever is contained in the existing column.
-      HCatFieldSchema mapField = getTableCol(fSchema.alias, hcatTblSchema);
+
       HCatFieldSchema valFS;
       List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
 
-      if(mapField != null){
-        Type mapValType = mapField.getMapValueSchema().get(0).getType();
-
-        switch(mapValType){
-        case STRING:
-        case BIGINT:
-        case INT:
-        case FLOAT:
-        case DOUBLE:
-        case BINARY:
-          valFS = new HCatFieldSchema(fSchema.alias, mapValType, null);
-          break;
-        default:
-          throw new FrontendException("Only pig primitive types are supported as map value types.", PigHCatUtil.PIG_EXCEPTION_CODE);
-        }
-        valFSList.add(valFS);
-        return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
+      if(hcatFieldSchema != null){
+        return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), "");
       }
 
       // Column not found in target table. Its a new column. Its schema is map<string,string>
@@ -267,55 +238,83 @@ public abstract class HCatBaseStorer ext
     }
   }
 
-  private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws ExecException, HCatException{
-
-    // The real work-horse. Spend time and energy in this method if there is
-    // need to keep HCatStorer lean and go fast.
-    Type type = hcatFS.getType();
-
-    switch(type){
+  private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException{
+    try {
 
-    case BINARY:
-    	ByteArrayRef ba = new ByteArrayRef();
-    	byte[] bytes = (null == pigObj) ? new byte[0] : ((DataByteArray)pigObj).get(); 
-    	ba.setData(bytes);
-    	return ba;
-    	
-    case STRUCT:
-      // Unwrap the tuple.
-      return ((Tuple)pigObj).getAll();
-      //        Tuple innerTup = (Tuple)pigObj;
-      //
-      //      List<Object> innerList = new ArrayList<Object>(innerTup.size());
-      //      int i = 0;
-      //      for(HCatTypeInfo structFieldTypeInfo : typeInfo.getAllStructFieldTypeInfos()){
-      //        innerList.add(getJavaObj(innerTup.get(i++), structFieldTypeInfo));
-      //      }
-      //      return innerList;
-    case ARRAY:
-      // Unwrap the bag.
-      DataBag pigBag = (DataBag)pigObj;
-      HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
-      boolean needTuple = tupFS.getType() == Type.STRUCT;
-      List<Object> bagContents = new ArrayList<Object>((int)pigBag.size());
-      Iterator<Tuple> bagItr = pigBag.iterator();
-
-      while(bagItr.hasNext()){
-        // If there is only one element in tuple contained in bag, we throw away the tuple.
-        bagContents.add(needTuple ? getJavaObj(bagItr.next(), tupFS) : bagItr.next().get(0));
+      // The real work-horse. Spend time and energy in this method if there is
+      // need to keep HCatStorer lean and go fast.
+      Type type = hcatFS.getType();
+      switch(type){
+
+      case BINARY:
+        ByteArrayRef ba = new ByteArrayRef();
+        byte[] bytes = (null == pigObj) ? new byte[0] : ((DataByteArray)pigObj).get();
+        ba.setData(bytes);
+        return ba;
+
+      case STRUCT:
+        if (pigObj == null) {
+          return null;
+        }
+        HCatSchema structSubSchema = hcatFS.getStructSubSchema();
+        // Unwrap the tuple.
+        List<Object> all = ((Tuple)pigObj).getAll();
+        ArrayList<Object> converted = new ArrayList<Object>(all.size());
+        for (int i = 0; i < all.size(); i++) {
+          converted.add(getJavaObj(all.get(i), structSubSchema.get(i)));
+        }
+        return converted;
 
-      }
-      return bagContents;
+      case ARRAY:
+        if (pigObj == null) {
+          return null;
+        }
+        // Unwrap the bag.
+        DataBag pigBag = (DataBag)pigObj;
+        HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
+        boolean needTuple = tupFS.getType() == Type.STRUCT;
+        List<Object> bagContents = new ArrayList<Object>((int)pigBag.size());
+        Iterator<Tuple> bagItr = pigBag.iterator();
+
+        while(bagItr.hasNext()){
+          // If there is only one element in tuple contained in bag, we throw away the tuple.
+          bagContents.add(getJavaObj(needTuple ? bagItr.next() : bagItr.next().get(0), tupFS));
 
-      //    case MAP:
-      //     Map<String,DataByteArray> pigMap = (Map<String,DataByteArray>)pigObj;
-      //     Map<String,Long> typeMap = new HashMap<String, Long>();
-      //     for(Entry<String, DataByteArray> entry: pigMap.entrySet()){
-      //       typeMap.put(entry.getKey(), new Long(entry.getValue().toString()));
-      //     }
-      //     return typeMap;
-    default:
-      return pigObj;
+        }
+        return bagContents;
+      case MAP:
+        if (pigObj == null) {
+          return null;
+        }
+        Map<?,?> pigMap = (Map<?,?>)pigObj;
+        Map<Object,Object> typeMap = new HashMap<Object, Object>();
+        for(Entry<?, ?> entry: pigMap.entrySet()){
+          // the value has a schema and not a FieldSchema
+          typeMap.put(
+              // Schema validation enforces that the Key is a String
+              (String)entry.getKey(),
+              getJavaObj(entry.getValue(), hcatFS.getMapValueSchema().get(0)));
+        }
+        return typeMap;
+      case STRING:
+      case INT:
+      case BIGINT:
+      case FLOAT:
+      case DOUBLE:
+        return pigObj;
+      case SMALLINT:
+      case TINYINT:
+      case BOOLEAN:
+        // would not pass schema validation anyway
+        throw new BackendException("Incompatible type "+type+" found in hcat table schema: "+hcatFS, PigHCatUtil.PIG_EXCEPTION_CODE);
+      default:
+        throw new BackendException("Unexpected type "+type+" for value "+pigObj + (pigObj == null ? "" : " of class " + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+    } catch (BackendException e) {
+      // provide the path to the field in the error message
+      throw new BackendException(
+          (hcatFS.getName() == null ? " " : hcatFS.getName()+".") + e.getMessage(),
+          e.getCause() == null ? e : e.getCause());
     }
   }
 
@@ -339,84 +338,51 @@ public abstract class HCatBaseStorer ext
     // dictated by semantics, consult HCatSchema of table when need be.
 
     for(FieldSchema pigField : pigSchema.getFields()){
-      byte type = pigField.type;
-      String alias = pigField.alias;
-      validateAlias(alias);
-      HCatFieldSchema hcatField = getTableCol(alias, tblSchema);
-
-      if(DataType.isComplex(type)){
-        switch(type){
-
-        case DataType.MAP:
-          if(hcatField != null){
-            if(hcatField.getMapKeyType() != Type.STRING){
-              throw new FrontendException("Key Type of map must be String "+hcatField,  PigHCatUtil.PIG_EXCEPTION_CODE);
-            }
-            if(hcatField.getMapValueSchema().get(0).isComplex()){
-              throw new FrontendException("Value type of map cannot be complex" + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
-            }
-          }
-          break;
+      HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema);
 
-        case DataType.BAG:
-          // Only map is allowed as complex type in tuples inside bag.
-          for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){
-            if(innerField.type == DataType.BAG || innerField.type == DataType.TUPLE) {
-              throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE);
-            }
-            validateAlias(innerField.alias);
-          }
-          if(hcatField != null){
-            // Do the same validation for HCatSchema.
-            HCatFieldSchema arrayFieldScehma = hcatField.getArrayElementSchema().get(0);
-            Type hType = arrayFieldScehma.getType();
-            if(hType == Type.STRUCT){
-              for(HCatFieldSchema structFieldInBag : arrayFieldScehma.getStructSubSchema().getFields()){
-                if(structFieldInBag.getType() == Type.STRUCT || structFieldInBag.getType() == Type.ARRAY){
-                  throw new FrontendException("Nested Complex types not allowed "+ hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
-                }
-              }
-            }
-            if(hType == Type.MAP){
-              if(arrayFieldScehma.getMapKeyType() != Type.STRING){
-                throw new FrontendException("Key Type of map must be String "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
-              }
-              if(arrayFieldScehma.getMapValueSchema().get(0).isComplex()){
-                throw new FrontendException("Value type of map cannot be complex "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
-              }
-            }
-            if(hType == Type.ARRAY) {
-              throw new FrontendException("Arrays cannot contain array within it. "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
-            }
-          }
-          break;
+      validateSchema(pigField, hcatField);
+    }
+
+    try {
+      PigHCatUtil.validateHCatTableSchemaFollowsPigRules(tblSchema);
+    } catch (IOException e) {
+      throw new FrontendException("HCatalog schema is not compatible with Pig: "+e.getMessage(),  PigHCatUtil.PIG_EXCEPTION_CODE, e);
+    }
+  }
+
+
+  private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField)
+      throws HCatException, FrontendException {
+    validateAlias(pigField.alias);
+    byte type = pigField.type;
+    if(DataType.isComplex(type)){
+      switch(type){
 
-        case DataType.TUPLE:
-          validateUnNested(pigField.schema);
-          if(hcatField != null){
-            for(HCatFieldSchema structFieldSchema : hcatField.getStructSubSchema().getFields()){
-              if(structFieldSchema.isComplex()){
-                throw new FrontendException("Nested Complex types are not allowed."+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
-              }
-            }
+      case DataType.MAP:
+        if(hcatField != null){
+          if(hcatField.getMapKeyType() != Type.STRING){
+            throw new FrontendException("Key Type of map must be String "+hcatField,  PigHCatUtil.PIG_EXCEPTION_CODE);
           }
-          break;
+          // Map values can be primitive or complex
+        }
+        break;
 
-        default:
-          throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
+      case DataType.BAG:
+        HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema();
+        for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){
+          validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema));
         }
-      }
-    }
+        break;
 
-    for(HCatFieldSchema hcatField : tblSchema.getFields()){
+      case DataType.TUPLE:
+        HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema();
+        for(FieldSchema innerField : pigField.schema.getFields()){
+          validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema));
+        }
+        break;
 
-      // We dont do type promotion/demotion.
-      Type hType = hcatField.getType();
-      switch(hType){
-      case SMALLINT:
-      case TINYINT:
-      case BOOLEAN:
-        throw new FrontendException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
+      default:
+        throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
       }
     }
   }
@@ -431,11 +397,12 @@ public abstract class HCatBaseStorer ext
   }
 
   // Finds column by name in HCatSchema, if not found returns null.
-  private HCatFieldSchema getTableCol(String alias, HCatSchema tblSchema){
-
-    for(HCatFieldSchema hcatField : tblSchema.getFields()){
-      if(hcatField.getName().equalsIgnoreCase(alias)){
-        return hcatField;
+  private HCatFieldSchema getColFromSchema(String alias, HCatSchema tblSchema){
+    if (tblSchema != null) {
+      for(HCatFieldSchema hcatField : tblSchema.getFields()){
+        if(hcatField!=null && hcatField.getName()!= null && hcatField.getName().equalsIgnoreCase(alias)){
+          return hcatField;
+        }
       }
     }
     // Its a new column

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1242773&r1=1242772&r2=1242773&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Fri Feb 10 13:59:58 2012
@@ -83,12 +83,14 @@ public class HCatStorer extends HCatBase
                                                              partitions,
                                                              PigHCatUtil.getHCatServerUri(job),
                                                              PigHCatUtil.getHCatServerPrincipal(job));
-    } else {
+    } else if(userStr.length == 1) {
       outputJobInfo = OutputJobInfo.create(null,
                                                              userStr[0],
                                                              partitions,
                                                              PigHCatUtil.getHCatServerUri(job),
                                                              PigHCatUtil.getHCatServerPrincipal(job));
+    } else {
+      throw new FrontendException("location "+location+" is invalid. It must be of the form [db.]table", PigHCatUtil.PIG_EXCEPTION_CODE);
     }
 
 
@@ -119,24 +121,24 @@ public class HCatStorer extends HCatBase
       computedSchema = convertPigSchemaToHCatSchema(pigSchema,hcatTblSchema);
       HCatOutputFormat.setSchema(job, computedSchema);
       p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-      
+
       PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_HIVE_CONF);
       PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
       PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
       PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
       PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
-      
+
       p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
 
     }else{
       config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
-      
+
       PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_HIVE_CONF);
       PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
       PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_TOKEN_SIGNATURE);
       PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE);
       PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM);
-      
+
     }
   }
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1242773&r1=1242772&r2=1242773&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java Fri Feb 10 13:59:58 2012
@@ -20,9 +20,11 @@ package org.apache.hcatalog.pig;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -32,19 +34,18 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.HCatArrayBag;
 import org.apache.hcatalog.data.HCatRecord;
 import org.apache.hcatalog.data.Pair;
 import org.apache.hcatalog.data.schema.HCatFieldSchema;
-import org.apache.hcatalog.data.schema.HCatSchema;
 import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.pig.LoadPushDown.RequiredField;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
-import org.apache.pig.LoadPushDown.RequiredField;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -92,7 +93,7 @@ public class PigHCatUtil {
   static HiveMetaStoreClient client = null;
 
   private static HiveMetaStoreClient createHiveMetaClient(String serverUri,
-      String serverKerberosPrincipal, Class clazz) throws Exception {
+      String serverKerberosPrincipal, Class<?> clazz) throws Exception {
     if (client != null){
       return client;
     }
@@ -102,12 +103,12 @@ public class PigHCatUtil {
       hiveConf.set("hive.metastore.local", "false");
       hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim());
     }
-    
+
     if (serverKerberosPrincipal != null){
       hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
-      hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal);    	
+      hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal);
     }
-    
+
     try {
       client = new HiveMetaStoreClient(hiveConf,null);
     } catch (Exception e){
@@ -209,6 +210,11 @@ public class PigHCatUtil {
     HCatFieldSchema arrayElementFieldSchema = hfs.getArrayElementSchema().get(0);
     if(arrayElementFieldSchema.getType() == Type.STRUCT) {
       bagSubFieldSchemas[0].setSchema(getTupleSubSchema(arrayElementFieldSchema));
+    } else if(arrayElementFieldSchema.getType() == Type.ARRAY) {
+      ResourceSchema s = new ResourceSchema();
+      List<ResourceFieldSchema> lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema));
+      s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+      bagSubFieldSchemas[0].setSchema(s);
     } else {
       ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1];
       innerTupleFieldSchemas[0] = new ResourceFieldSchema().setName("innerfield")
@@ -217,7 +223,8 @@ public class PigHCatUtil {
         .setSchema(null); // the element type is not a tuple - so no subschema
       bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas));
     }
-    return new ResourceSchema().setFields(bagSubFieldSchemas);
+    ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas);
+    return s;
 
   }
 
@@ -279,7 +286,7 @@ public class PigHCatUtil {
     if (type == Type.BINARY){
         return DataType.BYTEARRAY;
     }
-    
+
     if (type == Type.BOOLEAN){
       errMsg = "HCatalog column type 'BOOLEAN' is not supported in " +
       "Pig as a column type";
@@ -291,27 +298,34 @@ public class PigHCatUtil {
   }
 
   public static Tuple transformToTuple(HCatRecord hr, HCatSchema hs) throws Exception {
-      if (hr == null){
-        return null;
-      }
-      return transformToTuple(hr.getAll(),hs);
+    if (hr == null){
+      return null;
     }
+    return transformToTuple(hr.getAll(),hs);
+  }
 
   @SuppressWarnings("unchecked")
   public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception {
-      Type itemType = hfs.getType();
-      switch (itemType){
-	case BINARY:
-	  return new DataByteArray(((ByteArrayRef)o).getData());
-	case STRUCT:
-          return transformToTuple((List<Object>)o,hfs);
-	case ARRAY:
-          return transformToBag((List<? extends Object>) o,hfs);
-	case MAP:
-          return transformToPigMap((Map<String, Object>)o,hfs);
-	default:
-	  return o;
-      }
+    Object result;
+    Type itemType = hfs.getType();
+    switch (itemType){
+    case BINARY:
+      result = new DataByteArray(((ByteArrayRef)o).getData());
+      break;
+    case STRUCT:
+      result = transformToTuple((List<Object>)o,hfs);
+      break;
+    case ARRAY:
+      result = transformToBag((List<? extends Object>) o,hfs);
+      break;
+    case MAP:
+      result = transformToPigMap((Map<String, Object>)o,hfs);
+      break;
+    default:
+      result = o;
+      break;
+    }
+    return result;
   }
 
   public static Tuple transformToTuple(List<? extends Object> objList, HCatFieldSchema hfs) throws Exception {
@@ -319,7 +333,7 @@ public class PigHCatUtil {
           return transformToTuple(objList,hfs.getStructSubSchema());
       } catch (Exception e){
           if (hfs.getType() != Type.STRUCT){
-              throw new Exception("Expected Struct type, got "+hfs.getType());
+              throw new Exception("Expected Struct type, got "+hfs.getType(), e);
           } else {
               throw e;
           }
@@ -327,21 +341,29 @@ public class PigHCatUtil {
   }
 
   public static Tuple transformToTuple(List<? extends Object> objList, HCatSchema hs) throws Exception {
-        if (objList == null){
-          return null;
-        }
-        Tuple t = tupFac.newTuple(objList.size());
-        List<HCatFieldSchema> subFields = hs.getFields();
-        for (int i = 0; i < subFields.size(); i++){
-          t.set(i,extractPigObject(objList.get(i), subFields.get(i)));
-        }
-        return t;
+    if (objList == null){
+      return null;
+    }
+    Tuple t = tupFac.newTuple(objList.size());
+    List<HCatFieldSchema> subFields = hs.getFields();
+    for (int i = 0; i < subFields.size(); i++){
+      t.set(i,extractPigObject(objList.get(i), subFields.get(i)));
+    }
+    return t;
   }
 
   public static Map<String,Object> transformToPigMap(Map<String,Object> map, HCatFieldSchema hfs) throws Exception {
-      return map;
+    if (map == null) {
+      return null;
     }
 
+    Map<String,Object> result = new HashMap<String, Object>();
+    for (Entry<String, Object> entry : map.entrySet()) {
+      result.put(entry.getKey(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0)));
+    }
+    return result;
+  }
+
   @SuppressWarnings("unchecked")
   public static DataBag transformToBag(List<? extends Object> list, HCatFieldSchema hfs) throws Exception {
     if (list == null){
@@ -349,70 +371,55 @@ public class PigHCatUtil {
     }
 
     HCatFieldSchema elementSubFieldSchema = hfs.getArrayElementSchema().getFields().get(0);
-    if (elementSubFieldSchema.getType() == Type.STRUCT){
-      DataBag db = new DefaultDataBag();
-      for (Object o : list){
-        db.add(transformToTuple((List<Object>)o,elementSubFieldSchema));
+    DataBag db = new DefaultDataBag();
+    for (Object o : list){
+      Tuple tuple;
+      if (elementSubFieldSchema.getType() == Type.STRUCT){
+        tuple = transformToTuple((List<Object>)o, elementSubFieldSchema);
+      } else {
+        // bags always contain tuples
+        tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema));
       }
-      return db;
-    } else {
-      return  new HCatArrayBag(list);
+      db.add(tuple);
     }
+    return db;
   }
 
 
-  public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException {
-      for (HCatFieldSchema hfs : hcatTableSchema.getFields()){
-          Type htype = hfs.getType();
-          if (htype == Type.ARRAY){
-              validateIsPigCompatibleArrayWithPrimitivesOrSimpleComplexTypes(hfs);
-          }else if (htype == Type.STRUCT){
-              validateIsPigCompatibleStructWithPrimitives(hfs);
-          }else if (htype == Type.MAP){
-              validateIsPigCompatibleMapWithPrimitives(hfs);
-          }else {
-              validateIsPigCompatiblePrimitive(hfs);
-          }
-      }
-  }
-
-  private static void validateIsPigCompatibleArrayWithPrimitivesOrSimpleComplexTypes(
-          HCatFieldSchema hfs) throws IOException {
-      HCatFieldSchema subFieldSchema = hfs.getArrayElementSchema().getFields().get(0);
-      if (subFieldSchema.getType() == Type.STRUCT){
-          validateIsPigCompatibleStructWithPrimitives(subFieldSchema);
-      }else if (subFieldSchema.getType() == Type.MAP) {
-          validateIsPigCompatiblePrimitive(subFieldSchema.getMapValueSchema().getFields().get(0));
-      }else {
-          validateIsPigCompatiblePrimitive(subFieldSchema);
-      }
-  }
-
-  private static void validateIsPigCompatibleMapWithPrimitives(HCatFieldSchema hfs) throws IOException{
-      if (hfs.getMapKeyType() != Type.STRING){
-          throw new PigException("Incompatible type in schema, found map with " +
-                  "non-string key type in :"+hfs.getTypeString(), PIG_EXCEPTION_CODE);
-      }
-      validateIsPigCompatiblePrimitive(hfs.getMapValueSchema().getFields().get(0));
+  private static void validateHCatSchemaFollowsPigRules(HCatSchema tblSchema) throws PigException {
+    for(HCatFieldSchema hcatField : tblSchema.getFields()){
+      validateHcatFieldFollowsPigRules(hcatField);
+    }
   }
 
-  private static void validateIsPigCompatibleStructWithPrimitives(HCatFieldSchema hfs) throws IOException {
-      for ( HCatFieldSchema subField : hfs.getStructSubSchema().getFields()){
-          validateIsPigCompatiblePrimitive(subField);
+  private static void validateHcatFieldFollowsPigRules(HCatFieldSchema hcatField) throws PigException {
+    try {
+      Type hType = hcatField.getType();
+      switch(hType){
+      // We don't do type promotion/demotion.
+      case SMALLINT:
+      case TINYINT:
+      case BOOLEAN:
+        throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
+      case ARRAY:
+        validateHCatSchemaFollowsPigRules(hcatField.getArrayElementSchema());
+        break;
+      case STRUCT:
+        validateHCatSchemaFollowsPigRules(hcatField.getStructSubSchema());
+        break;
+      case MAP:
+        // key is only string
+        validateHCatSchemaFollowsPigRules(hcatField.getMapValueSchema());
+        break;
       }
+    } catch (HCatException e) {
+      throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE, e);
+    }
   }
 
-  private static void validateIsPigCompatiblePrimitive(HCatFieldSchema hfs) throws IOException {
-      Type htype = hfs.getType();
-      if (
-              (hfs.isComplex()) ||
-              (htype == Type.TINYINT) ||
-              (htype == Type.SMALLINT)
-              ){
-            throw new PigException("Incompatible type in schema, expected pig " +
-                      "compatible primitive for:" + hfs.getTypeString());
-          }
 
+  public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException {
+    validateHCatSchemaFollowsPigRules(hcatTableSchema);
   }
 
   public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) {

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java?rev=1242773&r1=1242772&r2=1242773&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileInputDriver.java Fri Feb 10 13:59:58 2012
@@ -156,6 +156,10 @@ public class RCFileInputDriver extends H
     // with crisp Java objects inside it. We have to do it because higher layer
     // may not know how to do it.
 
+    if (data == null) {
+      return null;
+    }
+
     switch(oi.getCategory()){
 
     case PRIMITIVE:

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java?rev=1242773&r1=1242772&r2=1242773&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/shims/HCatHadoopShims.java Fri Feb 10 13:59:58 2012
@@ -48,7 +48,7 @@ public interface HCatHadoopShims {
 						Class.forName(shimFQN).asSubclass(HCatHadoopShims.class);
 				return clasz.newInstance();
 			} catch (Exception e) {
-				throw new RuntimeException("Failed to instantiate: " + shimFQN);
+				throw new RuntimeException("Failed to instantiate: " + shimFQN, e);
 			}
 		}
 	}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MockLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MockLoader.java?rev=1242773&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MockLoader.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/MockLoader.java Fri Feb 10 13:59:58 2012
@@ -0,0 +1,159 @@
+package org.apache.hcatalog.pig;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+
+public class MockLoader extends LoadFunc {
+  private static final class MockRecordReader extends RecordReader<Object, Object> {
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException, InterruptedException {
+      return "mockKey";
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException, InterruptedException {
+      return "mockValue";
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return 0.5f;
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException,
+        InterruptedException {
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException, InterruptedException {
+      return true;
+    }
+  }
+
+  private static final class MockInputSplit extends InputSplit implements Writable  {
+    private String location;
+    public MockInputSplit() {
+    }
+    public MockInputSplit(String location) {
+      this.location = location;
+    }
+
+    @Override
+    public String[] getLocations() throws IOException, InterruptedException {
+      return new String[] { location };
+    }
+
+    @Override
+    public long getLength() throws IOException, InterruptedException {
+      return 10000000;
+    }
+
+    @Override
+    public boolean equals(Object arg0) {
+      return arg0==this;
+    }
+
+    @Override
+    public int hashCode() {
+      return location.hashCode();
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+      location = arg0.readUTF();
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+      arg0.writeUTF(location);
+    }
+  }
+
+  private static final class MockInputFormat extends InputFormat {
+
+    private final String location;
+
+    public MockInputFormat(String location) {
+      this.location = location;
+    }
+
+    @Override
+    public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
+        throws IOException, InterruptedException {
+      return new MockRecordReader();
+    }
+
+    @Override
+    public List getSplits(JobContext arg0) throws IOException, InterruptedException {
+      return Arrays.asList(new MockInputSplit(location));
+    }
+  }
+
+  private static final Map<String, Iterable<Tuple>> locationToData = new HashMap<String, Iterable<Tuple>>();
+
+  public static void setData(String location, Iterable<Tuple> data) {
+    locationToData.put(location, data);
+  }
+
+  private String location;
+
+  private Iterator<Tuple> data;
+
+  @Override
+  public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+    return location;
+  }
+
+  @Override
+  public void setLocation(String location, Job job) throws IOException {
+    this.location = location;
+    if (location == null) {
+      throw new IOException("null location passed to MockLoader");
+    }
+    this.data = locationToData.get(location).iterator();
+    if (this.data == null) {
+      throw new IOException("No data configured for location: "+location);
+    }
+  }
+
+  @Override
+  public Tuple getNext() throws IOException {
+    if (data == null) {
+      throw new IOException("data was not correctly initialized in MockLoader");
+    }
+    return data.hasNext() ? data.next() : null;
+  }
+
+  @Override
+  public InputFormat getInputFormat() throws IOException {
+    return new MockInputFormat(location);
+  }
+
+  @Override
+  public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException {
+  }
+
+}
\ No newline at end of file

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java?rev=1242773&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoaderComplexSchema.java Fri Feb 10 13:59:58 2012
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestHCatLoaderComplexSchema {
+
+  private static MiniCluster cluster = MiniCluster.buildCluster();
+  private static Driver driver;
+  private static Properties props;
+
+  private void dropTable(String tablename) throws IOException, CommandNeedRetryException{
+    driver.run("drop table "+tablename);
+  }
+
+  private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException{
+    String createTable;
+    createTable = "create table "+tablename+"("+schema+") ";
+    if ((partitionedBy != null)&&(!partitionedBy.trim().isEmpty())){
+      createTable = createTable + "partitioned by ("+partitionedBy+") ";
+    }
+    createTable = createTable + "stored as RCFILE tblproperties('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," +
+        "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+    System.out.println("Creating table:\n"+createTable);
+    CommandProcessorResponse result = driver.run(createTable);
+    int retCode = result.getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table. ["+createTable+"], return code from hive driver : ["+retCode+" "+result.getErrorMessage()+"]");
+    }
+  }
+
+  private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException{
+    createTable(tablename,schema,null);
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    HiveConf hiveConf = new HiveConf(TestHCatLoaderComplexSchema.class  );
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+    driver = new Driver(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+    props = new Properties();
+    props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+
+  }
+
+  private static final TupleFactory tf = TupleFactory.getInstance();
+  private static final BagFactory bf = BagFactory.getInstance();
+  private Tuple t(Object... objects) {
+    return tf.newTuple(Arrays.asList(objects));
+  }
+  private DataBag b(Tuple... objects) {
+    return bf.newDefaultBag(Arrays.asList(objects));
+  }
+
+  /**
+   * artificially complex nested schema to test nested schema conversion
+   * @throws Exception
+   */
+  @Test
+  public void testSyntheticComplexSchema() throws Exception {
+    String pigSchema =
+        "(" +
+          "a: " +
+    		    "(" +
+    		      "aa: chararray, " +
+    		      "ab: long, " +
+    		      "ac: map[], " +
+    		      "ad: { t: (ada: long) }, " +
+    		      "ae: { t: (aea:long, aeb: ( aeba: chararray, aebb: long)) }," +
+    		      "af: (afa: chararray, afb: long) " +
+    		    ")," +
+    		   "b: chararray, " +
+    		   "c: long, " +
+    		   "d:  { t: (da:long, db: ( dba: chararray, dbb: long), dc: { t: (dca: long) } ) } " +
+    		 ")";
+
+    // with extra structs
+    String tableSchema =
+        "a struct<" +
+            "aa: string, " +
+            "ab: bigint, " +
+            "ac: map<string, string>, " +
+            "ad: array<struct<ada:bigint>>, " +
+            "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
+            "af: struct<afa: string, afb: bigint> " +
+            ">, " +
+        "b string, " +
+        "c bigint, " +
+        "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<struct<dca: bigint>>>>";
+
+    // without extra structs
+    String tableSchema2 =
+        "a struct<" +
+            "aa: string, " +
+            "ab: bigint, " +
+            "ac: map<string, string>, " +
+            "ad: array<bigint>, " +
+            "ae: array<struct<aea:bigint, aeb: struct<aeba: string, aebb: bigint>>>," +
+            "af: struct<afa: string, afb: bigint> " +
+            ">, " +
+        "b string, " +
+        "c bigint, " +
+        "d array<struct<da: bigint, db: struct<dba:string, dbb:bigint>, dc: array<bigint>>>";
+
+    List<Tuple> data = new ArrayList<Tuple>();
+    for (int i = 0; i < 10; i++) {
+      Tuple t = t(
+          t(
+              "aa test",
+              2l,
+              new HashMap<String, String>() {{put("ac test1", "test 1");put("ac test2", "test 2");}},
+              b(t(3l), t(4l)),
+              b(t(5l, t("aeba test", 6l))),
+              t("afa test", 7l)
+          ),
+          "b test",
+          (long)i,
+          b(t(8l, t("dba test", 9l), b(t(10l)))));
+
+      data.add(t);
+    }
+    verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, true);
+    verifyWriteRead("testSyntheticComplexSchema", pigSchema, tableSchema, data, false);
+    verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, true);
+    verifyWriteRead("testSyntheticComplexSchema2", pigSchema, tableSchema2, data, false);
+
+  }
+
+  private void verifyWriteRead(String tablename, String pigSchema, String tableSchema, List<Tuple> data, boolean provideSchemaToStorer)
+      throws IOException, CommandNeedRetryException, ExecException, FrontendException {
+    MockLoader.setData(tablename+"Input", data);
+
+    try {
+      createTable(tablename, tableSchema);
+      PigServer server = new PigServer(ExecType.LOCAL, props);
+      server.setBatchOn();
+      server.registerQuery("A = load '"+tablename+"Input' using org.apache.hcatalog.pig.MockLoader() AS "+pigSchema+";");
+      Schema dumpedASchema = server.dumpSchema("A");
+      server.registerQuery("STORE A into '"+tablename+"' using org.apache.hcatalog.pig.HCatStorer("
+          + (provideSchemaToStorer ? "'', '"+pigSchema+"'" : "")
+          + ");");
+      ExecJob execJob = server.executeBatch().get(0);
+      if (!execJob.getStatistics().isSuccessful()) {
+        throw new RuntimeException("Import failed", execJob.getException());
+      }
+
+      // test that schema was loaded correctly
+      server.registerQuery("X = load '"+tablename+"' using org.apache.hcatalog.pig.HCatLoader();");
+      Iterator<Tuple> it = server.openIterator("X");
+      int i = 0;
+      while (it.hasNext()) {
+        Tuple input = data.get(i++);
+        Tuple output = it.next();
+        Assert.assertEquals(input.toString(), output.toString());
+        System.out.println(output);
+      }
+      Schema dumpedXSchema = server.dumpSchema("X");
+
+      Assert.assertEquals(
+          "expected " + dumpedASchema + " but was " + dumpedXSchema + " (ignoring field names)",
+          "",
+          compareIgnoreFiledNames(dumpedASchema, dumpedXSchema));
+
+    } finally {
+      dropTable(tablename);
+    }
+  }
+
+  private String compareIgnoreFiledNames(Schema expected, Schema got) throws FrontendException {
+    if (expected == null || got == null) {
+      if (expected == got) {
+        return "";
+      } else {
+        return "\nexpected "+expected+" got "+got;
+      }
+    }
+    if (expected.size() != got.size()) {
+      return "\nsize expected "+expected.size()+" ("+expected+") got "+got.size()+" ("+got+")";
+    }
+    String message = "";
+    for (int i = 0; i < expected.size(); i++) {
+      FieldSchema expectedField = expected.getField(i);
+      FieldSchema gotField = got.getField(i);
+      if (expectedField.type != gotField.type) {
+        message += "\ntype expected "+expectedField.type+" ("+expectedField+") got "+gotField.type+" ("+gotField+")";
+      } else {
+        message += compareIgnoreFiledNames(expectedField.schema, gotField.schema);
+      }
+    }
+    return message;
+  }
+
+  /**
+   * tests that unnecessary tuples are drop while converting schema
+   * (Pig requires Tuples in Bags)
+   * @throws Exception
+   */
+  @Test
+  public void testTupleInBagInTupleInBag() throws Exception {
+    String pigSchema = "(a: { b : ( c: { d: (i : long) } ) })";
+
+    String tableSchema = "a array< array< bigint > >";
+
+    List<Tuple> data = new ArrayList<Tuple>();
+    data.add(t(b(t(b(t(100l),t(101l))), t(b(t(110l))))));
+    data.add(t(b(t(b(t(200l))), t(b(t(210l))), t(b(t(220l))))));
+    data.add(t(b(t(b(t(300l),t(301l))))));
+    data.add(t(b(t(b(t(400l))), t(b(t(410l),t(411l),t(412l))))));
+
+
+    verifyWriteRead("TupleInBagInTupleInBag1", pigSchema, tableSchema, data, true);
+    verifyWriteRead("TupleInBagInTupleInBag2", pigSchema, tableSchema, data, false);
+
+    // test that we don't drop the unnecessary tuple if the table has the corresponding Struct
+    String tableSchema2 = "a array< struct< c: array< struct< i: bigint > > > >";
+
+    verifyWriteRead("TupleInBagInTupleInBag3", pigSchema, tableSchema2, data, true);
+    verifyWriteRead("TupleInBagInTupleInBag4", pigSchema, tableSchema2, data, false);
+
+  }
+
+  @Test
+  public void testMapWithComplexData() throws Exception {
+    String pigSchema = "(a: long, b: map[])";
+    String tableSchema = "a bigint, b map<string, struct<aa:bigint, ab:string>>";
+
+    List<Tuple> data = new ArrayList<Tuple>();
+    for (int i = 0; i < 10; i++) {
+      Tuple t = t(
+          (long)i,
+          new HashMap<String, Object>() {{put("b test 1", t(1l,"test 1"));put("b test 2", t(2l, "test 2"));}});
+
+      data.add(t);
+    }
+    verifyWriteRead("testMapWithComplexData", pigSchema, tableSchema, data, true);
+    verifyWriteRead("testMapWithComplexData2", pigSchema, tableSchema, data, false);
+
+  }
+ }



Mime
View raw message