hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From khorg...@apache.org
Subject svn commit: r1564924 [2/2] - in /hive/trunk: ./ hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/ hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/ hcatalog/core/src/...
Date Wed, 05 Feb 2014 21:02:58 GMT
Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatBaseStorer.java Wed Feb  5 21:02:57 2014
@@ -20,14 +20,27 @@
 package org.apache.hive.hcatalog.pig;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordWriter;
@@ -52,6 +65,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
+import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +89,22 @@ abstract class HCatBaseStorer extends St
   private RecordWriter<WritableComparable<?>, HCatRecord> writer;
   protected HCatSchema computedSchema;
   protected static final String PIG_SCHEMA = "hcat.pig.store.schema";
+  /**
+   * Controls what happens when incoming Pig value is out-of-range for target Hive column
+   */
+  static final String ON_OOR_VALUE_OPT = "onOutOfRangeValue";
+  /**
+   * prop name in Configuration/context
+   */
+  static final String ON_OORA_VALUE_PROP = "hcat.pig.store.onoutofrangevalue";
+  /**
+   * valid values for ON_OOR_VALUE_OPT
+   */
+  public static enum  OOR_VALUE_OPT_VALUES {Null, Throw}
   protected String sign;
+  //it's key that this is a per HCatStorer instance object
+  private final DataLossLogger dataLossLogger = new DataLossLogger();
+  private final OOR_VALUE_OPT_VALUES onOutOfRange;
 
   public HCatBaseStorer(String partSpecs, String schema) throws Exception {
 
@@ -95,12 +124,15 @@ abstract class HCatBaseStorer extends St
       }
     }
 
-    if (schema != null) {
+    if (schema != null && !schema.trim().isEmpty()) {
       pigSchema = Utils.getSchemaFromString(schema);
     }
-
+    Properties udfProps = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
+    onOutOfRange = OOR_VALUE_OPT_VALUES.valueOf(udfProps.getProperty(ON_OORA_VALUE_PROP, getDefaultValue().name()));
+  }
+  static OOR_VALUE_OPT_VALUES getDefaultValue() {
+    return OOR_VALUE_OPT_VALUES.Null;
   }
-
   @Override
   public void checkSchema(ResourceSchema resourceSchema) throws IOException {
 
@@ -123,17 +155,26 @@ abstract class HCatBaseStorer extends St
    * schema of the table in metastore.
    */
   protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("convertPigSchemaToHCatSchema(pigSchema,tblSchema)=(" + pigSchema + "," + tableSchema + ")");
+    }
     List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
     for (FieldSchema fSchema : pigSchema.getFields()) {
       try {
         HCatFieldSchema hcatFieldSchema = getColFromSchema(fSchema.alias, tableSchema);
-
-        fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema));
+        //if writing to a partitioned table, then pigSchema will have more columns than tableSchema
+        //partition columns are not part of tableSchema... e.g. TestHCatStorer#testPartColsInData()
+//        HCatUtil.assertNotNull(hcatFieldSchema, "Nothing matching '" + fSchema.alias + "' found " +
+//                "in target table schema", LOG);
+        fieldSchemas.add(getHCatFSFromPigFS(fSchema, hcatFieldSchema, pigSchema, tableSchema));
       } catch (HCatException he) {
         throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
       }
     }
-    return new HCatSchema(fieldSchemas);
+    
+    HCatSchema s = new HCatSchema(fieldSchemas);
+    LOG.debug("convertPigSchemaToHCatSchema(computed)=(" + s + ")");
+    return s;
   }
 
   public static boolean removeTupleFromBag(HCatFieldSchema hcatFieldSchema, FieldSchema bagFieldSchema) throws HCatException {
@@ -147,42 +188,60 @@ abstract class HCatBaseStorer extends St
     }
     return false;
   }
-
-
-  private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema) throws FrontendException, HCatException {
+  /**
+   * Here we are processing HCat table schema as derived from metastore, 
+   * thus it should have information about all fields/sub-fields, but not for partition columns
+   */
+  private HCatFieldSchema getHCatFSFromPigFS(FieldSchema fSchema, HCatFieldSchema hcatFieldSchema,
+                                             Schema pigSchema, HCatSchema tableSchema)
+          throws FrontendException, HCatException {
+    if(hcatFieldSchema == null) {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("hcatFieldSchema is null for fSchema '" + fSchema.alias + "'");
+        //throw new IllegalArgumentException("hcatFiledSchema is null; fSchema=" + fSchema + " " +
+        //      "(pigSchema, tableSchema)=(" + pigSchema + "," + tableSchema + ")");
+      }
+    }
     byte type = fSchema.type;
     switch (type) {
 
     case DataType.CHARARRAY:
     case DataType.BIGCHARARRAY:
-      return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
-
+      if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) {
+        return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null);
+      }
+      return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.stringTypeInfo, null);
     case DataType.INTEGER:
       if (hcatFieldSchema != null) {
         if (!SUPPORTED_INTEGER_CONVERSIONS.contains(hcatFieldSchema.getType())) {
           throw new FrontendException("Unsupported type: " + type + "  in Pig's schema",
             PigHCatUtil.PIG_EXCEPTION_CODE);
         }
-        return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getType(), null);
-      } else {
-        return new HCatFieldSchema(fSchema.alias, Type.INT, null);
+        return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null);
       }
-
+      return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.intTypeInfo, null);
     case DataType.LONG:
-      return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
-
+      return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.longTypeInfo, null);
     case DataType.FLOAT:
-      return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
-
+      return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.floatTypeInfo, null);
     case DataType.DOUBLE:
-      return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
-
+      return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.doubleTypeInfo, null);
     case DataType.BYTEARRAY:
-      return new HCatFieldSchema(fSchema.alias, Type.BINARY, null);
-
+      return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.binaryTypeInfo, null);
     case DataType.BOOLEAN:
-      return new HCatFieldSchema(fSchema.alias, Type.BOOLEAN, null);
-
+      return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.booleanTypeInfo, null);
+    case DataType.DATETIME:
+      //Pig DATETIME can map to DATE or TIMESTAMP (see HCatBaseStorer#validateSchema()) which
+      //is controlled by Hive target table information
+      if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) {
+        return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null);
+      }
+      return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.timestampTypeInfo, null);
+    case DataType.BIGDECIMAL:
+      if(hcatFieldSchema != null && hcatFieldSchema.getTypeInfo() != null) {
+        return new HCatFieldSchema(fSchema.alias, hcatFieldSchema.getTypeInfo(), null);
+      }
+      return new HCatFieldSchema(fSchema.alias, TypeInfoFactory.decimalTypeInfo, null);
     case DataType.BAG:
       Schema bagSchema = fSchema.schema;
       List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
@@ -193,21 +252,18 @@ abstract class HCatBaseStorer extends St
       } else {
         field = bagSchema.getField(0);
       }
-      arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema.getArrayElementSchema().get(0)));
+      arrFields.add(getHCatFSFromPigFS(field, hcatFieldSchema == null ? null : hcatFieldSchema
+              .getArrayElementSchema().get(0), pigSchema, tableSchema));
       return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
-
     case DataType.TUPLE:
-      List<String> fieldNames = new ArrayList<String>();
       List<HCatFieldSchema> hcatFSs = new ArrayList<HCatFieldSchema>();
       HCatSchema structSubSchema = hcatFieldSchema == null ? null : hcatFieldSchema.getStructSubSchema();
       List<FieldSchema> fields = fSchema.schema.getFields();
       for (int i = 0; i < fields.size(); i++) {
         FieldSchema fieldSchema = fields.get(i);
-        fieldNames.add(fieldSchema.alias);
-        hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i)));
+        hcatFSs.add(getHCatFSFromPigFS(fieldSchema, structSubSchema == null ? null : structSubSchema.get(i), pigSchema, tableSchema));
       }
       return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(hcatFSs), "");
-
     case DataType.MAP: {
       // Pig's schema contain no type information about map's keys and
       // values. So, if its a new column assume <string,string> if its existing
@@ -217,15 +273,18 @@ abstract class HCatBaseStorer extends St
       List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
 
       if (hcatFieldSchema != null) {
-        return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, hcatFieldSchema.getMapValueSchema(), "");
+        return HCatFieldSchema.createMapTypeFieldSchema(fSchema.alias, hcatFieldSchema.getMapKeyTypeInfo(), 
+          hcatFieldSchema.getMapValueSchema(), "");
       }
 
       // Column not found in target table. Its a new column. Its schema is map<string,string>
-      valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
+      valFS = new HCatFieldSchema(fSchema.alias, TypeInfoFactory.stringTypeInfo, "");
       valFSList.add(valFS);
-      return new HCatFieldSchema(fSchema.alias, Type.MAP, Type.STRING, new HCatSchema(valFSList), "");
+      return HCatFieldSchema.createMapTypeFieldSchema(fSchema.alias,
+        TypeInfoFactory.stringTypeInfo, new HCatSchema(valFSList), "");
     }
-
+    case DataType.BIGINTEGER:
+      //fall through; doesn't map to Hive/Hcat type; here for completeness
     default:
       throw new FrontendException("Unsupported type: " + type + "  in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
     }
@@ -253,24 +312,22 @@ abstract class HCatBaseStorer extends St
     }
   }
 
+  /**
+   * Convert from Pig value object to Hive value object
+   * This method assumes that {@link #validateSchema(org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema, org.apache.hive.hcatalog.data.schema.HCatFieldSchema, org.apache.pig.impl.logicalLayer.schema.Schema, org.apache.hive.hcatalog.data.schema.HCatSchema, int)}
+   * which checks the types in Pig schema are compatible with target Hive table, has been called.
+   */
   private Object getJavaObj(Object pigObj, HCatFieldSchema hcatFS) throws HCatException, BackendException {
     try {
-
+      if(pigObj == null) return null;
       // The real work-horse. Spend time and energy in this method if there is
       // need to keep HCatStorer lean and go fast.
       Type type = hcatFS.getType();
       switch (type) {
-
       case BINARY:
-        if (pigObj == null) {
-          return null;
-        }
         return ((DataByteArray) pigObj).get();
 
       case STRUCT:
-        if (pigObj == null) {
-          return null;
-        }
         HCatSchema structSubSchema = hcatFS.getStructSubSchema();
         // Unwrap the tuple.
         List<Object> all = ((Tuple) pigObj).getAll();
@@ -281,9 +338,6 @@ abstract class HCatBaseStorer extends St
         return converted;
 
       case ARRAY:
-        if (pigObj == null) {
-          return null;
-        }
         // Unwrap the bag.
         DataBag pigBag = (DataBag) pigObj;
         HCatFieldSchema tupFS = hcatFS.getArrayElementSchema().get(0);
@@ -298,9 +352,6 @@ abstract class HCatBaseStorer extends St
         }
         return bagContents;
       case MAP:
-        if (pigObj == null) {
-          return null;
-        }
         Map<?, ?> pigMap = (Map<?, ?>) pigObj;
         Map<Object, Object> typeMap = new HashMap<Object, Object>();
         for (Entry<?, ?> entry : pigMap.entrySet()) {
@@ -318,29 +369,18 @@ abstract class HCatBaseStorer extends St
       case DOUBLE:
         return pigObj;
       case SMALLINT:
-        if (pigObj == null) {
-          return null;
-        }
         if ((Integer) pigObj < Short.MIN_VALUE || (Integer) pigObj > Short.MAX_VALUE) {
-          throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
-            hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+          handleOutOfRangeValue(pigObj, hcatFS);
+          return null;
         }
         return ((Integer) pigObj).shortValue();
       case TINYINT:
-        if (pigObj == null) {
-          return null;
-        }
         if ((Integer) pigObj < Byte.MIN_VALUE || (Integer) pigObj > Byte.MAX_VALUE) {
-          throw new BackendException("Value " + pigObj + " is outside the bounds of column " +
-            hcatFS.getName() + " with type " + hcatFS.getType(), PigHCatUtil.PIG_EXCEPTION_CODE);
+          handleOutOfRangeValue(pigObj, hcatFS);
+          return null;
         }
         return ((Integer) pigObj).byteValue();
       case BOOLEAN:
-        if (pigObj == null) {
-          LOG.debug( "HCatBaseStorer.getJavaObj(BOOLEAN): obj null, bailing early" );
-          return null;
-        }
-
         if( pigObj instanceof String ) {
           if( ((String)pigObj).trim().compareTo("0") == 0 ) {
             return Boolean.FALSE;
@@ -348,24 +388,86 @@ abstract class HCatBaseStorer extends St
           if( ((String)pigObj).trim().compareTo("1") == 0 ) {
             return Boolean.TRUE;
           }
-
-          throw new BackendException(
-            "Unexpected type " + type + " for value " + pigObj
-            + (pigObj == null ? "" : " of class "
-            + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
+          throw new BackendException("Unexpected type " + type + " for value " + pigObj
+            + " of class " + pigObj.getClass().getName(), PigHCatUtil.PIG_EXCEPTION_CODE);
         }
-
         return Boolean.parseBoolean( pigObj.toString() );
+      case DECIMAL:
+        BigDecimal bd = (BigDecimal)pigObj;
+        DecimalTypeInfo dti = (DecimalTypeInfo)hcatFS.getTypeInfo();
+        if(bd.precision() > dti.precision() || bd.scale() > dti.scale()) {
+          handleOutOfRangeValue(pigObj, hcatFS);
+          return null;
+        }
+        return HiveDecimal.create(bd);
+      case CHAR:
+        String charVal = (String)pigObj;
+        CharTypeInfo cti = (CharTypeInfo)hcatFS.getTypeInfo(); 
+        if(charVal.length() > cti.getLength()) {
+          handleOutOfRangeValue(pigObj, hcatFS);
+          return null;
+        }
+        return new HiveChar(charVal, cti.getLength());
+      case VARCHAR:
+        String varcharVal = (String)pigObj;
+        VarcharTypeInfo vti = (VarcharTypeInfo)hcatFS.getTypeInfo();
+        if(varcharVal.length() > vti.getLength()) {
+          handleOutOfRangeValue(pigObj, hcatFS);
+          return null;
+        }
+        return new HiveVarchar(varcharVal, vti.getLength());
+      case TIMESTAMP:
+        DateTime dt = (DateTime)pigObj;
+        return new Timestamp(dt.getMillis());//getMillis() returns UTC time regardless of TZ
+      case DATE:
+        /**
+         * We ignore any TZ setting on Pig value since java.sql.Date doesn't have it (in any
+         * meaningful way).  So the assumption is that if Pig value has 0 time component (midnight)
+         * we assume it reasonably 'fits' into a Hive DATE.  If time part is not 0, it's considered
+         * out of range for target type.
+         */
+        DateTime dateTime = ((DateTime)pigObj);
+        if(dateTime.getMillisOfDay() != 0) {
+          handleOutOfRangeValue(pigObj, hcatFS, "Time component must be 0 (midnight) in local timezone; Local TZ val='" + pigObj + "'");
+          return null;
+        }
+        /*java.sql.Date is a poorly defined API.  Some (all?) SerDes call toString() on it
+        [e.g. LazySimpleSerDe, uses LazyUtils.writePrimitiveUTF8()],  which automatically adjusts
+          for local timezone.  Date.valueOf() also uses local timezone (as does Date(int,int,int).
+          Also see PigHCatUtil#extractPigObject() for corresponding read op.  This way a DATETIME from Pig,
+          when stored into Hive and read back comes back with the same value.*/
+        return new Date(dateTime.getYear() - 1900, dateTime.getMonthOfYear() - 1, dateTime.getDayOfMonth());
       default:
-        throw new BackendException("Unexpected type " + type + " for value " + pigObj
-          + (pigObj == null ? "" : " of class "
-          + pigObj.getClass().getName()), PigHCatUtil.PIG_EXCEPTION_CODE);
+        throw new BackendException("Unexpected HCat type " + type + " for value " + pigObj
+          + " of class " + pigObj.getClass().getName(), PigHCatUtil.PIG_EXCEPTION_CODE);
       }
     } catch (BackendException e) {
       // provide the path to the field in the error message
       throw new BackendException(
-        (hcatFS.getName() == null ? " " : hcatFS.getName() + ".") + e.getMessage(),
-        e.getCause() == null ? e : e.getCause());
+        (hcatFS.getName() == null ? " " : hcatFS.getName() + ".") + e.getMessage(), e);
+    }
+  }
+
+  private void handleOutOfRangeValue(Object pigObj, HCatFieldSchema hcatFS) throws BackendException {
+    handleOutOfRangeValue(pigObj, hcatFS, null);
+  }
+  /**
+   * depending on user config, throws an exception or logs a msg if the incoming Pig value is
+   * out-of-range for target type.
+   * @param additionalMsg may be {@code null} 
+   */
+  private void handleOutOfRangeValue(Object pigObj, HCatFieldSchema hcatFS, String additionalMsg) throws BackendException {
+    String msg = "Pig value '" + pigObj + "' is outside the bounds of column " + hcatFS.getName() +
+      " with type " + (hcatFS.getTypeInfo() == null ? hcatFS.getType() : hcatFS.getTypeInfo().getTypeName()) +
+      (additionalMsg == null ? "" : "[" + additionalMsg + "]");
+    switch (onOutOfRange) {
+      case Throw:
+        throw new BackendException(msg, PigHCatUtil.PIG_EXCEPTION_CODE);
+      case Null:
+        dataLossLogger.logDataLossMsg(hcatFS, pigObj, msg);
+        break;
+      default:
+        throw new BackendException("Unexpected " + ON_OOR_VALUE_OPT + " value: '" + onOutOfRange + "'");
     }
   }
 
@@ -387,10 +489,10 @@ abstract class HCatBaseStorer extends St
 
     // Iterate through all the elements in Pig Schema and do validations as
     // dictated by semantics, consult HCatSchema of table when need be.
-
+    int columnPos = 0;//helps with debug messages
     for (FieldSchema pigField : pigSchema.getFields()) {
       HCatFieldSchema hcatField = getColFromSchema(pigField.alias, tblSchema);
-      validateSchema(pigField, hcatField);
+      validateSchema(pigField, hcatField, pigSchema, tblSchema, columnPos++);
     }
 
     try {
@@ -400,8 +502,14 @@ abstract class HCatBaseStorer extends St
     }
   }
 
-
-  private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField)
+  /**
+   * This method encodes which Pig type can map (be stored in) to which HCat type.
+   * @throws HCatException
+   * @throws FrontendException
+   */
+  private void validateSchema(FieldSchema pigField, HCatFieldSchema hcatField, 
+                              Schema topLevelPigSchema, HCatSchema topLevelHCatSchema, 
+                              int columnPos)
     throws HCatException, FrontendException {
     validateAlias(pigField.alias);
     byte type = pigField.type;
@@ -420,14 +528,16 @@ abstract class HCatBaseStorer extends St
       case DataType.BAG:
         HCatSchema arrayElementSchema = hcatField == null ? null : hcatField.getArrayElementSchema();
         for (FieldSchema innerField : pigField.schema.getField(0).schema.getFields()) {
-          validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema));
+          validateSchema(innerField, getColFromSchema(pigField.alias, arrayElementSchema), 
+                  topLevelPigSchema, topLevelHCatSchema, columnPos);
         }
         break;
 
       case DataType.TUPLE:
         HCatSchema structSubSchema = hcatField == null ? null : hcatField.getStructSubSchema();
         for (FieldSchema innerField : pigField.schema.getFields()) {
-          validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema));
+          validateSchema(innerField, getColFromSchema(pigField.alias, structSubSchema),
+                  topLevelPigSchema, topLevelHCatSchema, columnPos);
         }
         break;
 
@@ -435,6 +545,66 @@ abstract class HCatBaseStorer extends St
         throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
       }
     }
+    else if(hcatField != null) {
+      //there is no point trying to validate further if we have no type info about target field
+      switch (type) {
+        case DataType.BIGDECIMAL:
+          throwTypeMismatchException(type, Lists.newArrayList(Type.DECIMAL), hcatField, columnPos);
+          break;
+        case DataType.DATETIME:
+          throwTypeMismatchException(type, Lists.newArrayList(Type.TIMESTAMP, Type.DATE), hcatField, columnPos);
+          break;
+        case DataType.BYTEARRAY:
+          throwTypeMismatchException(type, Lists.newArrayList(Type.BINARY), hcatField, columnPos);
+          break;
+        case DataType.BIGINTEGER:
+          throwTypeMismatchException(type, Collections.<Type>emptyList(), hcatField, columnPos);
+          break;
+        case DataType.BOOLEAN:
+          throwTypeMismatchException(type, Lists.newArrayList(Type.BOOLEAN), hcatField, columnPos);
+          break;
+        case DataType.CHARARRAY:
+          throwTypeMismatchException(type, Lists.newArrayList(Type.STRING, Type.CHAR, Type.VARCHAR), 
+                  hcatField, columnPos);
+          break;
+        case DataType.DOUBLE:
+          throwTypeMismatchException(type, Lists.newArrayList(Type.DOUBLE), hcatField, columnPos);
+          break;
+        case DataType.FLOAT:
+          throwTypeMismatchException(type, Lists.newArrayList(Type.FLOAT), hcatField, columnPos);
+          break;
+        case DataType.INTEGER:
+          throwTypeMismatchException(type, Lists.newArrayList(Type.INT, Type.BIGINT, 
+                  Type.TINYINT, Type.SMALLINT), hcatField, columnPos);
+          break;
+        case DataType.LONG:
+          throwTypeMismatchException(type, Lists.newArrayList(Type.BIGINT), hcatField, columnPos);
+          break;
+        default:
+          throw new FrontendException("'" + type + 
+                  "' Pig datatype in column " + columnPos + "(0-based) is not supported by HCat", 
+                  PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+    }
+    else {
+      if(false) {
+        //see HIVE-6194
+      throw new FrontendException("(pigSch,hcatSchema)=(" + pigField + "," +
+              "" + hcatField + ") (topPig, topHcat)=(" + topLevelPigSchema + "," +
+              "" + topLevelHCatSchema + ")");
+      }
+    }
+  }
+  private static void throwTypeMismatchException(byte pigDataType,
+      List<Type> hcatRequiredType, HCatFieldSchema hcatActualField, 
+      int columnPos) throws FrontendException {
+    if(!hcatRequiredType.contains(hcatActualField.getType())) {
+      throw new FrontendException( 
+              "Pig '" + DataType.findTypeName(pigDataType) + "' type in column " + 
+              columnPos + "(0-based) cannot map to HCat '" + 
+              hcatActualField.getType() + "'type.  Target filed must be of HCat type {" +
+              StringUtils.join(hcatRequiredType, " or ") + "}");
+    }
   }
 
   private void validateAlias(String alias) throws FrontendException {
@@ -467,4 +637,23 @@ abstract class HCatBaseStorer extends St
   @Override
   public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
   }
+
+  /**
+   * todo: when job is complete, should print the msgCount table to log 
+   */
+  private static final class DataLossLogger {
+    private static final Map<String, Integer> msgCount = new HashMap<String, Integer>();
+    private static String getColumnTypeKey(HCatFieldSchema fieldSchema) {
+      return fieldSchema.getName() + "_" + (fieldSchema.getTypeInfo() == null ?
+        fieldSchema.getType() : fieldSchema.getTypeInfo());
+    }
+    private void logDataLossMsg(HCatFieldSchema fieldSchema, Object pigOjb, String msg) {
+      String key = getColumnTypeKey(fieldSchema);
+      if(!msgCount.containsKey(key)) {
+        msgCount.put(key, 0);
+        LOG.warn(msg + " " + "Will write NULL instead.  Only 1 such message per type/column is emitted.");
+      }
+      msgCount.put(key, msgCount.get(key) + 1);
+    }
+  }
 }

Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java Wed Feb  5 21:02:57 2014
@@ -47,6 +47,8 @@ import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.impl.util.UDFContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Pig {@link org.apache.pig.LoadFunc} to read data from HCat
@@ -54,6 +56,7 @@ import org.apache.pig.impl.util.UDFConte
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class HCatLoader extends HCatBaseLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(HCatLoader.class);
 
   private static final String PARTITION_FILTER = "partition.filter"; // for future use
 
@@ -171,6 +174,9 @@ public class HCatLoader extends HCatBase
         }
       }
     }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("outputSchema=" + outputSchema);
+    }
 
   }
 

Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatStorer.java Wed Feb  5 21:02:57 2014
@@ -26,6 +26,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.classification.InterfaceAudience;
 import org.apache.hadoop.hive.common.classification.InterfaceStability;
@@ -45,6 +51,8 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * HCatStorer.
@@ -53,6 +61,7 @@ import org.apache.pig.impl.util.UDFConte
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public class HCatStorer extends HCatBaseStorer {
+  private static final Logger LOG = LoggerFactory.getLogger(HCatStorer.class);
 
   // Signature for wrapped storer, see comments in LoadFuncBasedInputDriver.initialize
   final public static String INNER_SIGNATURE = "hcatstorer.inner.signature";
@@ -60,18 +69,50 @@ public class HCatStorer extends HCatBase
   // A hash map which stores job credentials. The key is a signature passed by Pig, which is
   //unique to the store func and out file name (table, in our case).
   private static Map<String, Credentials> jobCredentials = new HashMap<String, Credentials>();
-
-
-  public HCatStorer(String partSpecs, String schema) throws Exception {
-    super(partSpecs, schema);
+  private final static Options validOptions = new Options();
+  static {
+    try {
+      populateValidOptions();
+    }
+    catch(Throwable t) {
+      LOG.error("Failed to build option list: ", t);
+      throw new RuntimeException(t);
+    }
   }
+  private final static CommandLineParser parser = new GnuParser();
 
+  /**
+   * @param optString may empty str (not null), in which case it's no-op
+   */
+  public HCatStorer(String partSpecs, String pigSchema, String optString) throws Exception {
+    super(partSpecs, pigSchema);
+    String[] optsArr = optString.split(" ");
+    CommandLine configuredOptions;
+    try {
+      configuredOptions = parser.parse(validOptions, optsArr);
+    } catch (ParseException e) {
+      HelpFormatter formatter = new HelpFormatter();
+      formatter.printHelp( "[-" + ON_OOR_VALUE_OPT + "]", validOptions );
+      throw e;
+    }
+    Properties udfProps = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
+    //'Throw' is the default for backwards compatibility
+    //downstream code expects it to be set to a valid value
+    udfProps.put(ON_OORA_VALUE_PROP, configuredOptions.getOptionValue(ON_OOR_VALUE_OPT, getDefaultValue().name()));
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("setting " + configuredOptions.getOptionValue(ON_OOR_VALUE_OPT));
+    }
+    isValidOOROption((String)udfProps.get(ON_OORA_VALUE_PROP));
+  }
+  public HCatStorer(String partSpecs, String pigSchema) throws Exception {
+    this(partSpecs, pigSchema, "");
+  }
   public HCatStorer(String partSpecs) throws Exception {
-    this(partSpecs, null);
+    this(partSpecs, null, "");
   }
 
   public HCatStorer() throws Exception {
-    this(null, null);
+    this(null, null, "");
   }
 
   @Override
@@ -79,6 +120,33 @@ public class HCatStorer extends HCatBase
     return new HCatOutputFormat();
   }
 
+  /**
+   * makes a list of all options that HCatStorer understands
+   */
+  private static void populateValidOptions() {
+    validOptions.addOption(ON_OOR_VALUE_OPT, true, 
+      "Controls how store operation handles Pig values which are out of range for the target column" +
+      "in Hive table.  Default is to throw an exception.");
+  }
+  /**
+   * check that onOutOfRangeValue handling is configured properly
+   * @throws FrontendException
+   */
+  private static void isValidOOROption(String optVal) throws FrontendException {
+    boolean found = false;
+    for(OOR_VALUE_OPT_VALUES v : OOR_VALUE_OPT_VALUES.values()) {
+      if(v.name().equalsIgnoreCase(optVal)) {
+        found = true;
+        break;
+      }
+    }
+    if(!found) {
+      throw new FrontendException("Unexpected value for '" + ON_OOR_VALUE_OPT + "' found: " + optVal);
+    }
+  }
+  /**
+   * @param location databaseName.tableName
+   */
   @Override
   public void setStoreLocation(String location, Job job) throws IOException {
     HCatContext.INSTANCE.setConf(job.getConfiguration()).getConf().get()

Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/PigHCatUtil.java Wed Feb  5 21:02:57 2014
@@ -20,8 +20,11 @@ package org.apache.hive.hcatalog.pig;
 
 
 import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Calendar;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +32,9 @@ import java.util.Map.Entry;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
@@ -56,6 +62,8 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.impl.util.Utils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -206,7 +214,7 @@ class PigHCatUtil {
       rfSchemaList.add(rfSchema);
     }
     ResourceSchema rSchema = new ResourceSchema();
-    rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[0]));
+    rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[rfSchemaList.size()]));
     return rSchema;
 
   }
@@ -266,7 +274,7 @@ class PigHCatUtil {
     } else if (arrayElementFieldSchema.getType() == Type.ARRAY) {
       ResourceSchema s = new ResourceSchema();
       List<ResourceFieldSchema> lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema));
-      s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+      s.setFields(lrfs.toArray(new ResourceFieldSchema[lrfs.size()]));
       bagSubFieldSchemas[0].setSchema(s);
     } else {
       ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1];
@@ -276,8 +284,7 @@ class PigHCatUtil {
         .setSchema(null); // the element type is not a tuple - so no subschema
       bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas));
     }
-    ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas);
-    return s;
+    return new ResourceSchema().setFields(bagSubFieldSchemas);
 
   }
 
@@ -288,7 +295,7 @@ class PigHCatUtil {
     for (HCatFieldSchema subField : hfs.getStructSubSchema().getFields()) {
       lrfs.add(getResourceSchemaFromFieldSchema(subField));
     }
-    s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+    s.setFields(lrfs.toArray(new ResourceFieldSchema[lrfs.size()]));
     return s;
   }
 
@@ -300,9 +307,16 @@ class PigHCatUtil {
   static public byte getPigType(HCatFieldSchema hfs) throws IOException {
     return getPigType(hfs.getType());
   }
-
+  /**
+   * Defines a mapping of HCatalog type to Pig type; not every mapping is exact, 
+   * see {@link #extractPigObject(Object, org.apache.hive.hcatalog.data.schema.HCatFieldSchema)}
+   * See http://pig.apache.org/docs/r0.12.0/basic.html#data-types
+   * See {@link org.apache.hive.hcatalog.pig.HCatBaseStorer#validateSchema(org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema, org.apache.hive.hcatalog.data.schema.HCatFieldSchema, org.apache.pig.impl.logicalLayer.schema.Schema, org.apache.hive.hcatalog.data.schema.HCatSchema, int)}
+   * for Pig->Hive type mapping.
+   */ 
   static public byte getPigType(Type type) throws IOException {
-    if (type == Type.STRING) {
+    if (type == Type.STRING || type == Type.CHAR || type == Type.VARCHAR) {
+      //CHARARRAY is unbounded so Hive->Pig is lossless
       return DataType.CHARARRAY;
     }
 
@@ -341,6 +355,14 @@ class PigHCatUtil {
     if (type == Type.BOOLEAN && pigHasBooleanSupport) {
       return DataType.BOOLEAN;
     }
+    if(type == Type.DECIMAL) {
+      //Hive is more restrictive, so Hive->Pig works
+      return DataType.BIGDECIMAL;
+    }
+    if(type == Type.DATE || type == Type.TIMESTAMP) {
+      //Hive Date is representable as Pig DATETIME
+      return DataType.DATETIME;
+    }
 
     throw new PigException("HCatalog column type '" + type.toString()
         + "' is not supported in Pig as a column type", PIG_EXCEPTION_CODE);
@@ -353,22 +375,54 @@ class PigHCatUtil {
     return transformToTuple(hr.getAll(), hs);
   }
 
-  @SuppressWarnings("unchecked")
+  /**
+   * Converts object from Hive's value system to Pig's value system
+   * see HCatBaseStorer#getJavaObj() for Pig->Hive conversion 
+   * @param o object from Hive value system
+   * @return object in Pig value system 
+   */
   public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception {
+    if(o == null) {
+      return null;
+    }
     Object result;
     Type itemType = hfs.getType();
     switch (itemType) {
     case BINARY:
-      result = (o == null) ? null : new DataByteArray((byte[]) o);
+      result = new DataByteArray((byte[]) o);
       break;
     case STRUCT:
-      result = transformToTuple((List<Object>) o, hfs);
+      result = transformToTuple((List<?>) o, hfs);
       break;
     case ARRAY:
-      result = transformToBag((List<? extends Object>) o, hfs);
+      result = transformToBag((List<?>) o, hfs);
       break;
     case MAP:
-      result = transformToPigMap((Map<Object, Object>) o, hfs);
+      result = transformToPigMap((Map<?,?>) o, hfs);
+      break;
+    case DECIMAL:
+      result = ((HiveDecimal)o).bigDecimalValue();
+      break;
+    case CHAR:
+      result = ((HiveChar)o).getValue();
+      break;
+    case VARCHAR:
+      result = ((HiveVarchar)o).getValue();
+      break;
+    case DATE:
+      /*java.sql.Date is weird.  It automatically adjusts it's millis value to be in the local TZ
+      * e.g. d = new java.sql.Date(System.currentMillis()).toString() so if you do this just after
+      * midnight in Palo Alto, you'll get yesterday's date printed out.*/
+      Date d = (Date)o;
+      result = new DateTime(d.getYear() + 1900, d.getMonth() + 1, d.getDate(), 0, 0);//uses local TZ
+      break;
+    case TIMESTAMP:
+      /*DATA TRUNCATION!!!
+       Timestamp may have nanos; we'll strip those away and create a Joda DateTime
+       object in local TZ; This is arbitrary, since Hive value doesn't have any TZ notion, but
+       we need to set something for TZ.
+       Timestamp is consistently in GMT (unless you call toString() on it) so we use millis*/
+      result = new DateTime(((Timestamp)o).getTime());//uses local TZ
       break;
     default:
       result = o;
@@ -377,7 +431,7 @@ class PigHCatUtil {
     return result;
   }
 
-  private static Tuple transformToTuple(List<? extends Object> objList, HCatFieldSchema hfs) throws Exception {
+  private static Tuple transformToTuple(List<?> objList, HCatFieldSchema hfs) throws Exception {
     try {
       return transformToTuple(objList, hfs.getStructSubSchema());
     } catch (Exception e) {
@@ -389,7 +443,7 @@ class PigHCatUtil {
     }
   }
 
-  private static Tuple transformToTuple(List<? extends Object> objList, HCatSchema hs) throws Exception {
+  private static Tuple transformToTuple(List<?> objList, HCatSchema hs) throws Exception {
     if (objList == null) {
       return null;
     }
@@ -401,21 +455,20 @@ class PigHCatUtil {
     return t;
   }
 
-  private static Map<String, Object> transformToPigMap(Map<Object, Object> map, HCatFieldSchema hfs) throws Exception {
+  private static Map<String, Object> transformToPigMap(Map<?, ?> map, HCatFieldSchema hfs) throws Exception {
     if (map == null) {
       return null;
     }
 
     Map<String, Object> result = new HashMap<String, Object>();
-    for (Entry<Object, Object> entry : map.entrySet()) {
+    for (Entry<?, ?> entry : map.entrySet()) {
       // since map key for Pig has to be Strings
       result.put(entry.getKey().toString(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0)));
     }
     return result;
   }
 
-  @SuppressWarnings("unchecked")
-  private static DataBag transformToBag(List<? extends Object> list, HCatFieldSchema hfs) throws Exception {
+  private static DataBag transformToBag(List<?> list, HCatFieldSchema hfs) throws Exception {
     if (list == null) {
       return null;
     }
@@ -425,7 +478,7 @@ class PigHCatUtil {
     for (Object o : list) {
       Tuple tuple;
       if (elementSubFieldSchema.getType() == Type.STRUCT) {
-        tuple = transformToTuple((List<Object>) o, elementSubFieldSchema);
+        tuple = transformToTuple((List<?>) o, elementSubFieldSchema);
       } else {
         // bags always contain tuples
         tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema));

Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java Wed Feb  5 21:02:57 2014
@@ -18,14 +18,11 @@
  */
 package org.apache.hive.hcatalog.pig;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.sql.Date;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -40,12 +37,14 @@ import org.apache.hadoop.hive.cli.CliSes
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
 import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.HcatTestUtils;
 import org.apache.hive.hcatalog.common.HCatUtil;
 import org.apache.hive.hcatalog.common.HCatConstants;
 import org.apache.hive.hcatalog.data.Pair;
+import org.apache.hive.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.ResourceStatistics;
@@ -53,11 +52,17 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.*;
 
 public class TestHCatLoader {
+  private static final Logger LOG = LoggerFactory.getLogger(TestHCatLoader.class);
   private static final String TEST_DATA_DIR = HCatUtil.makePathASafeFileName(System.getProperty("java.io.tmpdir") +
           File.separator + TestHCatLoader.class.getCanonicalName() + "-" + System.currentTimeMillis());
   private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse";
@@ -78,26 +83,46 @@ public class TestHCatLoader {
   }
 
   private void dropTable(String tablename) throws IOException, CommandNeedRetryException {
+    dropTable(tablename, driver);
+  }
+  static void dropTable(String tablename, Driver driver) throws IOException, CommandNeedRetryException {
     driver.run("drop table " + tablename);
   }
 
   private void createTable(String tablename, String schema, String partitionedBy) throws IOException, CommandNeedRetryException {
+    createTable(tablename, schema, partitionedBy, driver, storageFormat());
+  }
+  static void createTable(String tablename, String schema, String partitionedBy, Driver driver, String storageFormat) 
+      throws IOException, CommandNeedRetryException {
     String createTable;
     createTable = "create table " + tablename + "(" + schema + ") ";
     if ((partitionedBy != null) && (!partitionedBy.trim().isEmpty())) {
       createTable = createTable + "partitioned by (" + partitionedBy + ") ";
     }
-    createTable = createTable + "stored as " +storageFormat();
-    int retCode = driver.run(createTable).getResponseCode();
-    if (retCode != 0) {
-      throw new IOException("Failed to create table. [" + createTable + "], return code from hive driver : [" + retCode + "]");
-    }
+    createTable = createTable + "stored as " +storageFormat;
+    executeStatementOnDriver(createTable, driver);
   }
 
   private void createTable(String tablename, String schema) throws IOException, CommandNeedRetryException {
     createTable(tablename, schema, null);
   }
-
+  /**
+   * Execute Hive CLI statement
+   * @param cmd arbitrary statement to execute
+   */
+  static void executeStatementOnDriver(String cmd, Driver driver) throws IOException, CommandNeedRetryException {
+    LOG.debug("Executing: " + cmd);
+    CommandProcessorResponse cpr = driver.run(cmd);
+    if(cpr.getResponseCode() != 0) {
+      throw new IOException("Failed to execute \"" + cmd + "\". Driver returned " + cpr.getResponseCode() + " Error: " + cpr.getErrorMessage());
+    }
+  }
+  private static void checkProjection(FieldSchema fs, String expectedName, byte expectedPigType) {
+    assertEquals(fs.alias, expectedName);
+    assertEquals("Expected " + DataType.findTypeName(expectedPigType) + "; got " +
+      DataType.findTypeName(fs.type), expectedPigType, fs.type);
+  }
+  
   @Before
   public void setup() throws Exception {
 
@@ -127,6 +152,7 @@ public class TestHCatLoader {
 
     createTable(PARTITIONED_TABLE, "a int, b string", "bkt string");
     createTable(SPECIFIC_SIZE_TABLE, "a int, b string");
+    AllTypesTable.setupAllTypesTable(driver);
 
     int LOOP_SIZE = 3;
     String[] input = new String[LOOP_SIZE * LOOP_SIZE];
@@ -148,23 +174,23 @@ public class TestHCatLoader {
         //"Edward Hyde\t1337\t(415-253-6367,anonymous@b44chan.org)\t{(CREATIVE_WRITING),(COPYRIGHT_LAW)},[CREATIVE_WRITING#A+,COPYRIGHT_LAW#D],{(415-253-6367,cell),(408-253-6367,landline)}",
       }
     );
-
     PigServer server = new PigServer(ExecType.LOCAL);
     server.setBatchOn();
-    server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);");
+    int i = 0;
+    server.registerQuery("A = load '" + BASIC_FILE_NAME + "' as (a:int, b:chararray);", ++i);
 
-    server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();");
-    server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();");
-    server.registerQuery("B = foreach A generate a,b;");
-    server.registerQuery("B2 = filter B by a < 2;");
-    server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=0');");
-
-    server.registerQuery("C = foreach A generate a,b;");
-    server.registerQuery("C2 = filter C by a >= 2;");
-    server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=1');");
+    server.registerQuery("store A into '" + BASIC_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
+    server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
+    server.registerQuery("B = foreach A generate a,b;", ++i);
+    server.registerQuery("B2 = filter B by a < 2;", ++i);
+    server.registerQuery("store B2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=0');", ++i);
+    
+    server.registerQuery("C = foreach A generate a,b;", ++i);
+    server.registerQuery("C2 = filter C by a >= 2;", ++i);
+    server.registerQuery("store C2 into '" + PARTITIONED_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer('bkt=1');", ++i);
 
-    server.registerQuery("D = load '" + COMPLEX_FILE_NAME + "' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});");
-    server.registerQuery("store D into '" + COMPLEX_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();");
+    server.registerQuery("D = load '" + COMPLEX_FILE_NAME + "' as (name:chararray, studentid:int, contact:tuple(phno:chararray,email:chararray), currently_registered_courses:bag{innertup:tuple(course:chararray)}, current_grades:map[ ] , phnos :bag{innertup:tuple(phno:chararray,type:chararray)});", ++i);
+    server.registerQuery("store D into '" + COMPLEX_TABLE + "' using org.apache.hive.hcatalog.pig.HCatStorer();", ++i);
     server.executeBatch();
 
   }
@@ -176,6 +202,7 @@ public class TestHCatLoader {
       dropTable(COMPLEX_TABLE);
       dropTable(PARTITIONED_TABLE);
       dropTable(SPECIFIC_SIZE_TABLE);
+      dropTable(AllTypesTable.ALL_PRIMITIVE_TYPES_TABLE);
     } finally {
       FileUtils.deleteDirectory(new File(TEST_DATA_DIR));
     }
@@ -197,6 +224,20 @@ public class TestHCatLoader {
     assertTrue(Xfields.get(1).type == DataType.CHARARRAY);
 
   }
+  /**
+   * Test that we properly translate data types in Hive/HCat table schema into Pig schema
+   */
+  @Test
+  public void testSchemaLoadPrimitiveTypes() throws IOException {
+    AllTypesTable.testSchemaLoadPrimitiveTypes();
+  }
+  /**
+   * Test that value from Hive table are read properly in Pig
+   */
+  @Test
+  public void testReadDataPrimitiveTypes() throws Exception {
+    AllTypesTable.testReadDataPrimitiveTypes();
+  }
 
   @Test
   public void testReadDataBasic() throws IOException {
@@ -450,4 +491,114 @@ public class TestHCatLoader {
     assertEquals(0, t.get(1));
     assertFalse(iterator.hasNext());
   }
+
+  /**
+   * basic tests that cover each scalar type 
+   * https://issues.apache.org/jira/browse/HIVE-5814
+   */
+  private static final class AllTypesTable {
+    private static final String ALL_TYPES_FILE_NAME = TEST_DATA_DIR + "/alltypes.input.data";
+    private static final String ALL_PRIMITIVE_TYPES_TABLE = "junit_unparted_alltypes";
+    private static final String ALL_TYPES_SCHEMA = "( c_boolean boolean, " +   //0
+        "c_tinyint tinyint, " +     //1
+        "c_smallint smallint, " +   //2
+        "c_int int, " +             //3
+        "c_bigint bigint, " +       //4
+        "c_float float, " +         //5
+        "c_double double, " +       //6
+        "c_decimal decimal(5,2), " +//7
+        "c_string string, " +       //8
+        "c_char char(10), " +       //9
+        "c_varchar varchar(20), " + //10
+        "c_binary binary, " +       //11
+        "c_date date, " +           //12
+        "c_timestamp timestamp)";   //13
+    /**
+     * raw data for #ALL_PRIMITIVE_TYPES_TABLE
+     * All the values are within range of target data type (column)
+     */
+    private static final Object[][] primitiveRows = new Object[][] {
+        {Boolean.TRUE,Byte.MAX_VALUE,Short.MAX_VALUE, Integer.MAX_VALUE,Long.MAX_VALUE,Float.MAX_VALUE,Double.MAX_VALUE,555.22,"Kyiv","char(10)xx","varchar(20)","blah".getBytes(),Date.valueOf("2014-01-13"),Timestamp.valueOf("2014-01-13 19:26:25.0123")},
+        {Boolean.FALSE,Byte.MIN_VALUE,Short.MIN_VALUE, Integer.MIN_VALUE,Long.MIN_VALUE,Float.MIN_VALUE,Double.MIN_VALUE,-555.22,"Saint Petersburg","char(xx)00","varchar(yy)","doh".getBytes(),Date.valueOf("2014-01-14"), Timestamp.valueOf("2014-01-14 19:26:25.0123")}
+    };
+    /**
+     * Test that we properly translate data types in Hive/HCat table schema into Pig schema
+     */
+    private static void testSchemaLoadPrimitiveTypes() throws IOException {
+      PigServer server = new PigServer(ExecType.LOCAL);
+      server.registerQuery("X = load '" + ALL_PRIMITIVE_TYPES_TABLE + "' using " + HCatLoader.class.getName() + "();");
+      Schema dumpedXSchema = server.dumpSchema("X");
+      List<FieldSchema> Xfields = dumpedXSchema.getFields();
+      assertEquals("Expected " + HCatFieldSchema.Type.numPrimitiveTypes() + " fields, found " +
+          Xfields.size(), HCatFieldSchema.Type.numPrimitiveTypes(), Xfields.size());
+      checkProjection(Xfields.get(0), "c_boolean", DataType.BOOLEAN);
+      checkProjection(Xfields.get(1), "c_tinyint", DataType.INTEGER);
+      checkProjection(Xfields.get(2), "c_smallint", DataType.INTEGER);
+      checkProjection(Xfields.get(3), "c_int", DataType.INTEGER);
+      checkProjection(Xfields.get(4), "c_bigint", DataType.LONG);
+      checkProjection(Xfields.get(5), "c_float", DataType.FLOAT);
+      checkProjection(Xfields.get(6), "c_double", DataType.DOUBLE);
+      checkProjection(Xfields.get(7), "c_decimal", DataType.BIGDECIMAL);
+      checkProjection(Xfields.get(8), "c_string", DataType.CHARARRAY);
+      checkProjection(Xfields.get(9), "c_char", DataType.CHARARRAY);
+      checkProjection(Xfields.get(10), "c_varchar", DataType.CHARARRAY);
+      checkProjection(Xfields.get(11), "c_binary", DataType.BYTEARRAY);
+      checkProjection(Xfields.get(12), "c_date", DataType.DATETIME);
+      checkProjection(Xfields.get(13), "c_timestamp", DataType.DATETIME);
+    }
+    /**
+     * Test that value from Hive table are read properly in Pig
+     */
+    private static void testReadDataPrimitiveTypes() throws Exception {
+      PigServer server = new PigServer(ExecType.LOCAL);
+      server.registerQuery("X = load '" + ALL_PRIMITIVE_TYPES_TABLE + "' using " + HCatLoader.class.getName() + "();");
+      Iterator<Tuple> XIter = server.openIterator("X");
+      int numTuplesRead = 0;
+      while (XIter.hasNext()) {
+        Tuple t = XIter.next();
+        assertEquals(HCatFieldSchema.Type.numPrimitiveTypes(), t.size());
+        int colPos = 0;
+        for(Object referenceData : primitiveRows[numTuplesRead]) {
+          if(referenceData == null) {
+            assertTrue("rowNum=" + numTuplesRead + " colNum=" + colPos + " Reference data is null; actual " +
+                t.get(colPos), t.get(colPos) == null);
+          }
+          else if(referenceData instanceof java.util.Date) {
+            assertTrue("rowNum=" + numTuplesRead + " colNum=" + colPos + " Reference data=" + ((java.util.Date)referenceData).getTime() + " actual=" +
+                ((DateTime)t.get(colPos)).getMillis() + "; types=(" + referenceData.getClass() + "," + t.get(colPos).getClass() + ")",
+                ((java.util.Date)referenceData).getTime()== ((DateTime)t.get(colPos)).getMillis());
+            //note that here we ignore nanos part of Hive Timestamp since nanos are dropped when reading Hive from Pig by design
+          }
+          else {
+            assertTrue("rowNum=" + numTuplesRead + " colNum=" + colPos + " Reference data=" + referenceData + " actual=" +
+                t.get(colPos) + "; types=(" + referenceData.getClass() + "," + t.get(colPos).getClass() + ")",
+                referenceData.toString().equals(t.get(colPos).toString()));
+            //doing String comps here as value objects in Hive in Pig are different so equals() doesn't work
+          }
+          colPos++;
+        }
+        numTuplesRead++;
+      }
+      assertTrue("Expected " + primitiveRows.length + "; found " + numTuplesRead, numTuplesRead == primitiveRows.length);
+    }
+    private static void setupAllTypesTable(Driver driver) throws Exception {
+      String[] primitiveData = new String[primitiveRows.length];
+      for(int i = 0; i < primitiveRows.length; i++) {
+        Object[] rowData = primitiveRows[i];
+        StringBuilder row = new StringBuilder();
+        for(Object cell : rowData) {
+          row.append(row.length() == 0 ? "" : "\t").append(cell == null ? null : cell);
+        }
+        primitiveData[i] = row.toString();
+      }
+      HcatTestUtils.createTestDataFile(ALL_TYPES_FILE_NAME, primitiveData);
+      String cmd = "create table " + ALL_PRIMITIVE_TYPES_TABLE + ALL_TYPES_SCHEMA +
+          "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'" +
+          " STORED AS TEXTFILE";
+      executeStatementOnDriver(cmd, driver);
+      cmd = "load data local inpath '" + HCatUtil.makePathASafeFileName(ALL_TYPES_FILE_NAME) +
+          "' into table " + ALL_PRIMITIVE_TYPES_TABLE;
+      executeStatementOnDriver(cmd, driver);
+    }
+  }
 }

Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoaderStorer.java Wed Feb  5 21:02:57 2014
@@ -134,7 +134,7 @@ public class TestHCatLoaderStorer extend
     server.registerQuery("data = load '" + data +
       "' using PigStorage('\t') as (my_small_int:int, my_tiny_int:int);");
     server.registerQuery(
-      "store data into 'test_tbl' using org.apache.hive.hcatalog.pig.HCatStorer();");
+      "store data into 'test_tbl' using org.apache.hive.hcatalog.pig.HCatStorer('','','-onOutOfRangeValue Throw');");
     List<ExecJob> jobs = server.executeBatch();
     Assert.assertEquals(expectedStatus, jobs.get(0).getStatus());
   }

Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java
URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java (original)
+++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorer.java Wed Feb  5 21:02:57 2014
@@ -18,13 +18,19 @@
  */
 package org.apache.hive.hcatalog.pig;
 
+import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
 
 import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hive.hcatalog.HcatTestUtils;
 import org.apache.hive.hcatalog.mapreduce.HCatBaseTest;
 import org.apache.pig.EvalFunc;
@@ -35,13 +41,333 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.LogUtils;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.junit.Assert;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestHCatStorer extends HCatBaseTest {
+  private static final Logger LOG = LoggerFactory.getLogger(TestHCatStorer.class);
 
   private static final String INPUT_FILE_NAME = TEST_DATA_DIR + "/input.data";
 
+  //Start: tests that check values from Pig that are out of range for target column
+  @Test
+  public void testWriteTinyint() throws Exception {
+    pigValueRangeTest("junitTypeTest1", "tinyint", "int", null, Integer.toString(1), Integer.toString(1));
+    pigValueRangeTestOverflow("junitTypeTest1", "tinyint", "int", null, Integer.toString(300));
+    pigValueRangeTestOverflow("junitTypeTest2", "tinyint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      Integer.toString(300));
+    pigValueRangeTestOverflow("junitTypeTest3", "tinyint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      Integer.toString(300));
+  }
+  @Test
+  public void testWriteSmallint() throws Exception {
+    pigValueRangeTest("junitTypeTest1", "smallint", "int", null, Integer.toString(Short.MIN_VALUE),
+      Integer.toString(Short.MIN_VALUE));
+    pigValueRangeTestOverflow("junitTypeTest2", "smallint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      Integer.toString(Short.MAX_VALUE + 1));
+    pigValueRangeTestOverflow("junitTypeTest3", "smallint", "int", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      Integer.toString(Short.MAX_VALUE + 1));
+  }
+  @Test
+  public void testWriteChar() throws Exception {
+    pigValueRangeTest("junitTypeTest1", "char(5)", "chararray", null, "xxx", "xxx  ");
+    pigValueRangeTestOverflow("junitTypeTest1", "char(5)", "chararray", null, "too_long");
+    pigValueRangeTestOverflow("junitTypeTest2", "char(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      "too_long");
+    pigValueRangeTestOverflow("junitTypeTest3", "char(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      "too_long2");
+  }
+  @Test
+  public void testWriteVarchar() throws Exception {
+    pigValueRangeTest("junitTypeTest1", "varchar(5)", "chararray", null, "xxx", "xxx");
+    pigValueRangeTestOverflow("junitTypeTest1", "varchar(5)", "chararray", null, "too_long");
+    pigValueRangeTestOverflow("junitTypeTest2", "varchar(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      "too_long");
+    pigValueRangeTestOverflow("junitTypeTest3", "varchar(5)", "chararray", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      "too_long2");
+  }
+  @Test
+  public void testWriteDecimalXY() throws Exception {
+    pigValueRangeTest("junitTypeTest1", "decimal(5,2)", "bigdecimal", null, BigDecimal.valueOf(1.2).toString(),
+      BigDecimal.valueOf(1.2).toString());
+    pigValueRangeTestOverflow("junitTypeTest1", "decimal(5,2)", "bigdecimal", null, BigDecimal.valueOf(12345.12).toString());
+    pigValueRangeTestOverflow("junitTypeTest2", "decimal(5,2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      BigDecimal.valueOf(500.123).toString());
+    pigValueRangeTestOverflow("junitTypeTest3", "decimal(5,2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      BigDecimal.valueOf(500.123).toString());
+  }
+  @Test
+  public void testWriteDecimalX() throws Exception {
+    //interestingly decimal(2) means decimal(2,0)
+    pigValueRangeTest("junitTypeTest1", "decimal(2)", "bigdecimal", null, BigDecimal.valueOf(12).toString(),
+      BigDecimal.valueOf(12).toString());
+    pigValueRangeTestOverflow("junitTypeTest2", "decimal(2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      BigDecimal.valueOf(50.123).toString());
+    pigValueRangeTestOverflow("junitTypeTest3", "decimal(2)", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      BigDecimal.valueOf(50.123).toString());
+  }
+  @Test
+  public void testWriteDecimal() throws Exception {
+    //decimal means decimal(10,0)
+    pigValueRangeTest("junitTypeTest1", "decimal", "bigdecimal", null, BigDecimal.valueOf(1234567890).toString(),
+      BigDecimal.valueOf(1234567890).toString());
+    pigValueRangeTestOverflow("junitTypeTest2", "decimal", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      BigDecimal.valueOf(12345678900L).toString());
+    pigValueRangeTestOverflow("junitTypeTest3", "decimal", "bigdecimal", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      BigDecimal.valueOf(12345678900L).toString());
+  }
+  /**
+   * because we want to ignore TZ which is included in toString()
+   * include time to make sure it's 0 
+   */
+  private static final String FORMAT_4_DATE = "yyyy-MM-dd HH:mm:ss";
+  @Test
+  public void testWriteDate() throws Exception {
+    DateTime d = new DateTime(1991,10,11,0,0);
+    pigValueRangeTest("junitTypeTest1", "date", "datetime", null, d.toString(),
+      d.toString(FORMAT_4_DATE), FORMAT_4_DATE);
+    pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      d.plusHours(2).toString(), FORMAT_4_DATE);//time != 0
+    pigValueRangeTestOverflow("junitTypeTest3", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      d.plusMinutes(1).toString(), FORMAT_4_DATE);//time != 0
+    d = new DateTime(1991,10,11,0,0,DateTimeZone.forOffsetHours(-11));
+    pigValueRangeTest("junitTypeTest4", "date", "datetime", null, d.toString(),
+      d.toString(FORMAT_4_DATE), FORMAT_4_DATE);
+    pigValueRangeTestOverflow("junitTypeTest5", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      d.plusHours(2).toString(), FORMAT_4_DATE);//date out of range due to time != 0
+    pigValueRangeTestOverflow("junitTypeTest6", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      d.plusMinutes(1).toString(), FORMAT_4_DATE);//date out of range due to time!=0
+  }
+  @Test
+  public void testWriteDate3() throws Exception {
+    DateTime d = new DateTime(1991,10,11,23,10,DateTimeZone.forOffsetHours(-11));
+    FrontendException fe = null;
+    //expect to fail since the time component is not 0
+    pigValueRangeTestOverflow("junitTypeTest4", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      d.toString(), FORMAT_4_DATE);
+    pigValueRangeTestOverflow("junitTypeTest5", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      d.plusHours(2).toString(), FORMAT_4_DATE);
+    pigValueRangeTestOverflow("junitTypeTest6", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      d.plusMinutes(1).toString(), FORMAT_4_DATE);
+  }
+  @Test
+  public void testWriteDate2() throws Exception {
+    DateTime d = new DateTime(1991,11,12,0,0, DateTimeZone.forID("US/Eastern"));
+    pigValueRangeTest("junitTypeTest1", "date", "datetime", null, d.toString(),
+      d.toString(FORMAT_4_DATE), FORMAT_4_DATE);
+    pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      d.plusHours(2).toString(), FORMAT_4_DATE);
+    pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      d.plusMillis(20).toString(), FORMAT_4_DATE);
+    pigValueRangeTestOverflow("junitTypeTest2", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      d.plusMillis(12).toString(), FORMAT_4_DATE);
+    pigValueRangeTestOverflow("junitTypeTest3", "date", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Throw,
+      d.plusMinutes(1).toString(), FORMAT_4_DATE);
+  }
+  /**
+   * Note that the value that comes back from Hive will have local TZ on it.  Using local is 
+   * arbitrary but DateTime needs TZ (or will assume default) and Hive does not have TZ.
+   * So if you start with Pig value in TZ=x and write to Hive, when you read it back the TZ may
+   * be different.  The millis value should match, of course.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testWriteTimestamp() throws Exception {
+    DateTime d = new DateTime(1991,10,11,14,23,30, 10);//uses default TZ
+    pigValueRangeTest("junitTypeTest1", "timestamp", "datetime", null, d.toString(), 
+      d.toDateTime(DateTimeZone.getDefault()).toString());
+    d = d.plusHours(2);
+    pigValueRangeTest("junitTypeTest2", "timestamp", "datetime", HCatBaseStorer.OOR_VALUE_OPT_VALUES.Null,
+      d.toString(), d.toDateTime(DateTimeZone.getDefault()).toString());
+    d = d.toDateTime(DateTimeZone.UTC);
+    pigValueRangeTest("junitTypeTest3", "timestamp", "datetime", null, d.toString(), 
+      d.toDateTime(DateTimeZone.getDefault()).toString());
+
+    d = new DateTime(1991,10,11,23,24,25, 26);
+    pigValueRangeTest("junitTypeTest1", "timestamp", "datetime", null, d.toString(), 
+      d.toDateTime(DateTimeZone.getDefault()).toString());
+    d = d.toDateTime(DateTimeZone.UTC);
+    pigValueRangeTest("junitTypeTest3", "timestamp", "datetime", null, d.toString(), 
+      d.toDateTime(DateTimeZone.getDefault()).toString());
+  }
+  //End: tests that check values from Pig that are out of range for target column
+
+
+  private void pigValueRangeTestOverflow(String tblName, String hiveType, String pigType,
+    HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue, String format) throws Exception {
+    pigValueRangeTest(tblName, hiveType, pigType, goal, inputValue, null, format);
+  }
+  private void pigValueRangeTestOverflow(String tblName, String hiveType, String pigType,
+                                 HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue) throws Exception {
+    pigValueRangeTest(tblName, hiveType, pigType, goal, inputValue, null, null);
+  }
+  private void pigValueRangeTest(String tblName, String hiveType, String pigType,
+                                 HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue, 
+                                 String expectedValue) throws Exception {
+    pigValueRangeTest(tblName, hiveType, pigType, goal, inputValue, expectedValue, null);
+  }
+  /**
+   * This is used to test how Pig values of various data types which are out of range for Hive target
+   * column are handled.  Currently the options are to raise an error or write NULL.
+   * 1. create a data file with 1 column, 1 row
+   * 2. load into pig
+   * 3. use pig to store into Hive table
+   * 4. read from Hive table using Pig
+   * 5. check that read value is what is expected
+   * @param tblName Hive table name to create
+   * @param hiveType datatype to use for the single column in table
+   * @param pigType corresponding Pig type when loading file into Pig
+   * @param goal how out-of-range values from Pig are handled by HCat, may be {@code null}
+   * @param inputValue written to file which is read by Pig, thus must be something Pig can read
+   *                   (e.g. DateTime.toString(), rather than java.sql.Date)
+   * @param expectedValue what Pig should see when reading Hive table
+   * @param format date format to use for comparison of values since default DateTime.toString()
+   *               includes TZ which is meaningless for Hive DATE type
+   */
+  private void pigValueRangeTest(String tblName, String hiveType, String pigType, 
+                                 HCatBaseStorer.OOR_VALUE_OPT_VALUES goal, String inputValue, String expectedValue, String format)
+    throws Exception {
+    TestHCatLoader.dropTable(tblName, driver);
+    final String field = "f1";
+    TestHCatLoader.createTable(tblName, field + " " + hiveType, null, driver, "RCFILE");
+    HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, new String[] {inputValue});
+    LOG.debug("File=" + INPUT_FILE_NAME);
+    dumpFile(INPUT_FILE_NAME);
+    PigServer server = createPigServer(true);
+    int queryNumber = 1;
+    logAndRegister(server,
+      "A = load '" + INPUT_FILE_NAME + "' as (" + field + ":" + pigType + ");", queryNumber++);
+    Iterator<Tuple> firstLoad = server.openIterator("A");
+    if(goal == null) {
+      logAndRegister(server,
+        "store A into '" + tblName + "' using " + HCatStorer.class.getName() + "();", queryNumber++);
+    }
+    else {
+      FrontendException fe = null;
+      try {
+        logAndRegister(server,
+          "store A into '" + tblName + "' using " + HCatStorer.class.getName() + "('','','-" +
+          HCatStorer.ON_OOR_VALUE_OPT + " " + goal + "');",
+          queryNumber++);
+      }
+      catch(FrontendException e) {
+        fe = e;
+      }
+      switch (goal) {
+        case Null:
+          //do nothing, fall through and verify the data
+          break;
+        case Throw:
+          Assert.assertTrue("Expected a FrontendException", fe != null);
+          Assert.assertEquals("Expected a different FrontendException.", fe.getMessage(), "Unable to store alias A");
+          return;//this test is done
+        default:
+          Assert.assertFalse("Unexpected goal: " + goal, 1 == 1);
+      }
+    }
+    logAndRegister(server, "B = load '" + tblName + "' using " + HCatLoader.class.getName() + "();", queryNumber);
+    CommandProcessorResponse cpr = driver.run("select * from " + tblName);
+    LOG.debug("cpr.respCode=" + cpr.getResponseCode() + " cpr.errMsg=" + cpr.getErrorMessage() +
+      " for table " + tblName);
+    List l = new ArrayList();
+    driver.getResults(l);
+    LOG.debug("Dumping rows via SQL from " + tblName);
+    for(Object t : l) {
+      LOG.debug(t == null ? null : t.toString() + " t.class=" + t.getClass());
+    }
+    Iterator<Tuple> itr = server.openIterator("B");
+    int numRowsRead = 0;
+    while(itr.hasNext()) {
+      Tuple t = itr.next();
+      if("date".equals(hiveType)) {
+        DateTime dateTime = (DateTime)t.get(0);
+        Assert.assertTrue(format != null);
+        Assert.assertEquals("Comparing Pig to Raw data for table " + tblName, expectedValue, dateTime== null ? null : dateTime.toString(format));
+      }
+      else {
+        Assert.assertEquals("Comparing Pig to Raw data for table " + tblName, expectedValue, t.isNull(0) ? null : t.get(0).toString());
+      }
+      //see comment at "Dumping rows via SQL..." for why this doesn't work
+      //Assert.assertEquals("Comparing Pig to Hive", t.get(0), l.get(0));
+      numRowsRead++;
+    }
+    Assert.assertEquals("Expected " + 1 + " rows; got " + numRowsRead + " file=" + INPUT_FILE_NAME + "; table " +
+      tblName, 1, numRowsRead);
+    /* Misc notes:
+    Unfortunately Timestamp.toString() adjusts the value for local TZ and 't' is a String
+    thus the timestamp in 't' doesn't match rawData*/
+  }
+  /**
+   * Create a data file with datatypes added in 0.13.  Read it with Pig and use
+   * Pig + HCatStorer to write to a Hive table.  Then read it using Pig and Hive
+   * and make sure results match.
+   */
+  @Test
+  public void testDateCharTypes() throws Exception {
+    final String tblName = "junit_date_char";
+    TestHCatLoader.dropTable(tblName, driver);
+    TestHCatLoader.createTable(tblName,
+      "id int, char5 char(5), varchar10 varchar(10), dec52 decimal(5,2)", null, driver, "RCFILE");
+    int NUM_ROWS = 5;
+    String[] rows = new String[NUM_ROWS];
+    for(int i = 0; i < NUM_ROWS; i++) {
+      //since the file is read by Pig, we need to make sure the values are in format that Pig understands
+      //otherwise it will turn the value to NULL on read
+      rows[i] = i + "\txxxxx\tyyy\t" + 5.2;
+    }
+    HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, rows);
+    LOG.debug("File=" + INPUT_FILE_NAME);
+//    dumpFile(INPUT_FILE_NAME);
+    PigServer server = createPigServer(true);
+    int queryNumber = 1;
+    logAndRegister(server,
+      "A = load '" + INPUT_FILE_NAME + "' as (id:int, char5:chararray, varchar10:chararray, dec52:bigdecimal);",
+      queryNumber++);
+    logAndRegister(server,
+      "store A into '" + tblName + "' using " + HCatStorer.class.getName() + "();", queryNumber++);
+    logAndRegister(server, "B = load '" + tblName + "' using " + HCatLoader.class.getName() + "();",
+      queryNumber);
+    CommandProcessorResponse cpr = driver.run("select * from " + tblName);
+    LOG.debug("cpr.respCode=" + cpr.getResponseCode() + " cpr.errMsg=" + cpr.getErrorMessage());
+    List l = new ArrayList();
+    driver.getResults(l);
+    LOG.debug("Dumping rows via SQL from " + tblName);
+      /*Unfortunately Timestamp.toString() adjusts the value for local TZ and 't' is a String
+      * thus the timestamp in 't' doesn't match rawData*/
+    for(Object t : l) {
+      LOG.debug(t == null ? null : t.toString());
+    }
+    Iterator<Tuple> itr = server.openIterator("B");
+    int numRowsRead = 0;
+    while (itr.hasNext()) {
+      Tuple t = itr.next();
+      StringBuilder rowFromPig = new StringBuilder();
+      for(int i = 0; i < t.size(); i++) {
+        rowFromPig.append(t.get(i)).append("\t");
+      }
+      rowFromPig.setLength(rowFromPig.length() - 1);
+      Assert.assertEquals("Comparing Pig to Raw data", rowFromPig.toString(), rows[numRowsRead]);
+      //see comment at "Dumping rows via SQL..." for why this doesn't work (for all types)
+      //Assert.assertEquals("Comparing Pig to Hive", rowFromPig.toString(), l.get(numRowsRead));
+      numRowsRead++;
+    }
+    Assert.assertEquals("Expected " + NUM_ROWS + " rows; got " + numRowsRead + " file=" + INPUT_FILE_NAME, NUM_ROWS, numRowsRead);
+  }
+  private static void dumpFile(String fileName) throws Exception {
+    File file = new File(fileName);
+    BufferedReader reader = new BufferedReader(new FileReader(file));
+    String line = null;
+    LOG.debug("Dumping raw file: " + fileName);
+    while((line = reader.readLine()) != null) {
+      LOG.debug(line);
+    }
+    reader.close();
+  }
   @Test
   public void testPartColsInData() throws IOException, CommandNeedRetryException {
 
@@ -74,7 +400,7 @@ public class TestHCatStorer extends HCat
     }
 
     Assert.assertFalse(itr.hasNext());
-    Assert.assertEquals(11, i);
+    Assert.assertEquals(LOOP_SIZE, i);
   }
 
   @Test
@@ -597,7 +923,7 @@ public class TestHCatStorer extends HCat
     Assert.assertEquals(0, results.size());
     driver.run("drop table employee");
   }
-
+  @Test
   public void testPartitionPublish()
     throws IOException, CommandNeedRetryException {
 
@@ -619,9 +945,9 @@ public class TestHCatStorer extends HCat
     server.registerQuery("A = load '" + INPUT_FILE_NAME
         + "' as (a:int, c:chararray);");
     server.registerQuery("B = filter A by " + FailEvalFunc.class.getName()
-        + "($0);");
+      + "($0);");
     server.registerQuery("store B into 'ptn_fail' using "
-        + HCatStorer.class.getName() + "('b=math');");
+      + HCatStorer.class.getName() + "('b=math');");
     server.executeBatch();
 
     String query = "show partitions ptn_fail";
@@ -639,7 +965,7 @@ public class TestHCatStorer extends HCat
     // Make sure the partitions directory is not in hdfs.
     Assert.assertTrue((new File(TEST_WAREHOUSE_DIR + "/ptn_fail")).exists());
     Assert.assertFalse((new File(TEST_WAREHOUSE_DIR + "/ptn_fail/b=math"))
-        .exists());
+      .exists());
   }
 
   static public class FailEvalFunc extends EvalFunc<Boolean> {

Modified: hive/trunk/itests/hcatalog-unit/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/hcatalog-unit/pom.xml?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/itests/hcatalog-unit/pom.xml (original)
+++ hive/trunk/itests/hcatalog-unit/pom.xml Wed Feb  5 21:02:57 2014
@@ -192,6 +192,14 @@
           <version>${pig.version}</version>
           <scope>test</scope>
         </dependency>
+        <dependency>
+          <!--this should be automatically brought in by Pig, it's not in Pig 0.12 due to a bug
+              in Pig which requires it This is fixed in Pig's pom file in ASF trunk (pig 13)-->
+          <groupId>joda-time</groupId>
+          <artifactId>joda-time</artifactId>
+          <version>2.2</version>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
    <profile>
@@ -332,6 +340,14 @@
           <classifier>h2</classifier>
           <scope>test</scope>
         </dependency>
+        <dependency>
+          <!--this should be automatically brought in by Pig, it's not in Pig 0.12 due to a bug
+              in Pig which requires it This is fixed in Pig's pom file in ASF trunk (pig 13)-->
+          <groupId>joda-time</groupId>
+          <artifactId>joda-time</artifactId>
+          <version>2.2</version>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
   </profiles>

Modified: hive/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/pom.xml?rev=1564924&r1=1564923&r2=1564924&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/trunk/pom.xml Wed Feb  5 21:02:57 2014
@@ -127,7 +127,7 @@
         requires netty < 3.6.0 we force hadoops version
       -->
     <netty.version>3.4.0.Final</netty.version>
-    <pig.version>0.10.1</pig.version>
+    <pig.version>0.12.0</pig.version>
     <protobuf.version>2.5.0</protobuf.version>
     <stax.version>1.0.1</stax.version>
     <slf4j.version>1.7.5</slf4j.version>



Mime
View raw message