incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tra...@apache.org
Subject svn commit: r1383152 [3/27] - in /incubator/hcatalog/trunk: ./ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/ hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/ hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/ s...
Date Mon, 10 Sep 2012 23:29:03 GMT
Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java Mon Sep 10 23:28:55 2012
@@ -56,388 +56,388 @@ import org.apache.pig.impl.util.UDFConte
 
 public class PigHCatUtil {
 
-  static final int PIG_EXCEPTION_CODE = 1115; // http://wiki.apache.org/pig/PigErrorHandlingFunctionalSpecification#Error_codes
-  private static final String DEFAULT_DB = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+    static final int PIG_EXCEPTION_CODE = 1115; // http://wiki.apache.org/pig/PigErrorHandlingFunctionalSpecification#Error_codes
+    private static final String DEFAULT_DB = MetaStoreUtils.DEFAULT_DATABASE_NAME;
 
-  private final  Map<Pair<String,String>, Table> hcatTableCache =
-    new HashMap<Pair<String,String>, Table>();
+    private final Map<Pair<String, String>, Table> hcatTableCache =
+        new HashMap<Pair<String, String>, Table>();
 
-  private static final TupleFactory tupFac = TupleFactory.getInstance();
+    private static final TupleFactory tupFac = TupleFactory.getInstance();
 
-  static public Pair<String, String> getDBTableNames(String location) throws IOException {
-    // the location string will be of the form:
-    // <database name>.<table name> - parse it and
-    // communicate the information to HCatInputFormat
-
-    try {
-      return HCatUtil.getDbAndTableName(location);
-    } catch (IOException e) {
-      String locationErrMsg = "The input location in load statement " +
-      "should be of the form " +
-      "<databasename>.<table name> or <table name>. Got " + location;
-      throw new PigException(locationErrMsg, PIG_EXCEPTION_CODE);
-    }
-  }
-
-  static public String getHCatServerUri(Job job) {
-
-    return job.getConfiguration().get(HiveConf.ConfVars.METASTOREURIS.varname);
-  }
-
-  static public String getHCatServerPrincipal(Job job) {
-
-    return job.getConfiguration().get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
-  }
-
-  private static HiveMetaStoreClient getHiveMetaClient(String serverUri,
-                                                       String serverKerberosPrincipal, Class<?> clazz) throws Exception {
-    HiveConf hiveConf = new HiveConf(clazz);
-
-    if (serverUri != null){
-      hiveConf.set("hive.metastore.local", "false");
-      hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim());
-    }
-
-    if (serverKerberosPrincipal != null){
-      hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
-      hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal);
-    }
-
-    try {
-        return HCatUtil.getHiveClient(hiveConf);
-    } catch (Exception e){
-      throw new Exception("Could not instantiate a HiveMetaStoreClient connecting to server uri:["+serverUri+"]",e);
-    }
-  }
-
-
-  HCatSchema getHCatSchema(List<RequiredField> fields, String signature, Class<?> classForUDFCLookup) throws IOException {
-    if(fields == null) {
-      return null;
-    }
-
-    Properties props = UDFContext.getUDFContext().getUDFProperties(
-        classForUDFCLookup, new String[] {signature});
-    HCatSchema hcatTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA);
-
-    ArrayList<HCatFieldSchema> fcols = new ArrayList<HCatFieldSchema>();
-    for(RequiredField rf: fields) {
-      fcols.add(hcatTableSchema.getFields().get(rf.getIndex()));
-    }
-    return new HCatSchema(fcols);
-  }
-
-  public Table getTable(String location, String hcatServerUri, String hcatServerPrincipal) throws IOException{
-    Pair<String, String> loc_server = new Pair<String,String>(location, hcatServerUri);
-    Table hcatTable = hcatTableCache.get(loc_server);
-    if(hcatTable != null){
-      return hcatTable;
-    }
-
-    Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
-    String dbName = dbTablePair.first;
-    String tableName = dbTablePair.second;
-    Table table = null;
-    HiveMetaStoreClient client = null;
-    try {
-      client = getHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class);
-      table = HCatUtil.getTable(client, dbName, tableName);
-    } catch (NoSuchObjectException nsoe){
-      throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend
-    } catch (Exception e) {
-      throw new IOException(e);
-    } finally {
-        HCatUtil.closeHiveClientQuietly(client);
-    }
-    hcatTableCache.put(loc_server, table);
-    return table;
-  }
-
-  public static ResourceSchema getResourceSchema(HCatSchema hcatSchema) throws IOException {
-
-    List<ResourceFieldSchema> rfSchemaList = new ArrayList<ResourceFieldSchema>();
-    for (HCatFieldSchema hfs : hcatSchema.getFields()){
-      ResourceFieldSchema rfSchema;
-      rfSchema = getResourceSchemaFromFieldSchema(hfs);
-      rfSchemaList.add(rfSchema);
-    }
-    ResourceSchema rSchema = new ResourceSchema();
-    rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[0]));
-    return rSchema;
-
-  }
-
-  private static ResourceFieldSchema getResourceSchemaFromFieldSchema(HCatFieldSchema hfs)
-      throws IOException {
-    ResourceFieldSchema rfSchema;
-    // if we are dealing with a bag or tuple column - need to worry about subschema
-    if(hfs.getType() == Type.STRUCT) {
-        rfSchema = new ResourceFieldSchema()
-          .setName(hfs.getName())
-          .setDescription(hfs.getComment())
-          .setType(getPigType( hfs))
-          .setSchema(getTupleSubSchema(hfs));
-    } else if(hfs.getType() == Type.ARRAY) {
-        rfSchema = new ResourceFieldSchema()
-          .setName(hfs.getName())
-          .setDescription(hfs.getComment())
-          .setType(getPigType( hfs))
-          .setSchema(getBagSubSchema(hfs));
-    } else {
-      rfSchema = new ResourceFieldSchema()
-          .setName(hfs.getName())
-          .setDescription(hfs.getComment())
-          .setType(getPigType( hfs))
-          .setSchema(null); // no munging inner-schemas
-    }
-    return rfSchema;
-  }
-
-  protected static ResourceSchema getBagSubSchema(HCatFieldSchema hfs) throws IOException {
-    // there are two cases - array<Type> and array<struct<...>>
-    // in either case the element type of the array is represented in a
-    // tuple field schema in the bag's field schema - the second case (struct)
-    // more naturally translates to the tuple - in the first case (array<Type>)
-    // we simulate the tuple by putting the single field in a tuple
-
-    Properties props = UDFContext.getUDFContext().getClientSystemProps();
-    String innerTupleName = HCatConstants.HCAT_PIG_INNER_TUPLE_NAME_DEFAULT;
-    if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME)) {
-      innerTupleName = props.getProperty(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME)
-          .replaceAll("FIELDNAME", hfs.getName());
-    }
-    String innerFieldName = HCatConstants.HCAT_PIG_INNER_FIELD_NAME_DEFAULT;
-    if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_FIELD_NAME)) {
-      innerFieldName = props.getProperty(HCatConstants.HCAT_PIG_INNER_FIELD_NAME)
-          .replaceAll("FIELDNAME", hfs.getName());
-    }
-
-    ResourceFieldSchema[] bagSubFieldSchemas = new ResourceFieldSchema[1];
-    bagSubFieldSchemas[0] = new ResourceFieldSchema().setName(innerTupleName)
-      .setDescription("The tuple in the bag")
-      .setType(DataType.TUPLE);
-    HCatFieldSchema arrayElementFieldSchema = hfs.getArrayElementSchema().get(0);
-    if(arrayElementFieldSchema.getType() == Type.STRUCT) {
-      bagSubFieldSchemas[0].setSchema(getTupleSubSchema(arrayElementFieldSchema));
-    } else if(arrayElementFieldSchema.getType() == Type.ARRAY) {
-      ResourceSchema s = new ResourceSchema();
-      List<ResourceFieldSchema> lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema));
-      s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
-      bagSubFieldSchemas[0].setSchema(s);
-    } else {
-      ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1];
-      innerTupleFieldSchemas[0] = new ResourceFieldSchema().setName(innerFieldName)
-        .setDescription("The inner field in the tuple in the bag")
-        .setType(getPigType(arrayElementFieldSchema))
-        .setSchema(null); // the element type is not a tuple - so no subschema
-      bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas));
-    }
-    ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas);
-    return s;
-
-  }
-
-  private static ResourceSchema getTupleSubSchema(HCatFieldSchema hfs) throws IOException {
-    // for each struct subfield, create equivalent ResourceFieldSchema
-    ResourceSchema s = new ResourceSchema();
-    List<ResourceFieldSchema> lrfs = new ArrayList<ResourceFieldSchema>();
-    for(HCatFieldSchema subField : hfs.getStructSubSchema().getFields()) {
-      lrfs.add(getResourceSchemaFromFieldSchema(subField));
-    }
-    s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
-    return s;
-  }
-
-/**
-   * @param hfs the field schema of the column
-   * @return corresponding pig type
-   * @throws IOException
-   */
-  static public byte getPigType(HCatFieldSchema hfs) throws IOException {
-    return getPigType(hfs.getType());
-  }
-
-  static public byte getPigType(Type type) throws IOException {
-    String errMsg;
-
-    if (type == Type.STRING){
-      return DataType.CHARARRAY;
-    }
-
-    if ( (type == Type.INT) || (type == Type.SMALLINT) || (type == Type.TINYINT)){
-      return DataType.INTEGER;
-    }
-
-    if (type == Type.ARRAY){
-      return DataType.BAG;
-    }
-
-    if (type == Type.STRUCT){
-      return DataType.TUPLE;
-    }
-
-    if (type == Type.MAP){
-      return DataType.MAP;
-    }
-
-    if (type == Type.BIGINT){
-      return DataType.LONG;
-    }
-
-    if (type == Type.FLOAT){
-      return DataType.FLOAT;
-    }
-
-    if (type == Type.DOUBLE){
-      return DataType.DOUBLE;
-    }
-
-    if (type == Type.BINARY){
-        return DataType.BYTEARRAY;
-    }
-
-    if (type == Type.BOOLEAN){
-      errMsg = "HCatalog column type 'BOOLEAN' is not supported in " +
-      "Pig as a column type";
-      throw new PigException(errMsg, PIG_EXCEPTION_CODE);
-    }
-
-    errMsg = "HCatalog column type '"+ type.toString() +"' is not supported in Pig as a column type";
-    throw new PigException(errMsg, PIG_EXCEPTION_CODE);
-  }
-
-  public static Tuple transformToTuple(HCatRecord hr, HCatSchema hs) throws Exception {
-    if (hr == null){
-      return null;
-    }
-    return transformToTuple(hr.getAll(),hs);
-  }
-
-  @SuppressWarnings("unchecked")
-  public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception {
-    Object result;
-    Type itemType = hfs.getType();
-    switch (itemType){
-    case BINARY:
-      result = (o == null) ? null : new DataByteArray((byte[])o);
-      break;
-    case STRUCT:
-      result = transformToTuple((List<Object>)o,hfs);
-      break;
-    case ARRAY:
-      result = transformToBag((List<? extends Object>) o,hfs);
-      break;
-    case MAP:
-      result = transformToPigMap((Map<String, Object>)o,hfs);
-      break;
-    default:
-      result = o;
-      break;
-    }
-    return result;
-  }
-
-  public static Tuple transformToTuple(List<? extends Object> objList, HCatFieldSchema hfs) throws Exception {
-      try {
-          return transformToTuple(objList,hfs.getStructSubSchema());
-      } catch (Exception e){
-          if (hfs.getType() != Type.STRUCT){
-              throw new Exception("Expected Struct type, got "+hfs.getType(), e);
-          } else {
-              throw e;
-          }
-      }
-  }
-
-  public static Tuple transformToTuple(List<? extends Object> objList, HCatSchema hs) throws Exception {
-    if (objList == null){
-      return null;
-    }
-    Tuple t = tupFac.newTuple(objList.size());
-    List<HCatFieldSchema> subFields = hs.getFields();
-    for (int i = 0; i < subFields.size(); i++){
-      t.set(i,extractPigObject(objList.get(i), subFields.get(i)));
-    }
-    return t;
-  }
-
-  public static Map<String,Object> transformToPigMap(Map<String,Object> map, HCatFieldSchema hfs) throws Exception {
-    if (map == null) {
-      return null;
-    }
-
-    Map<String,Object> result = new HashMap<String, Object>();
-    for (Entry<String, Object> entry : map.entrySet()) {
-      result.put(entry.getKey(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0)));
-    }
-    return result;
-  }
-
-  @SuppressWarnings("unchecked")
-  public static DataBag transformToBag(List<? extends Object> list, HCatFieldSchema hfs) throws Exception {
-    if (list == null){
-      return null;
-    }
-
-    HCatFieldSchema elementSubFieldSchema = hfs.getArrayElementSchema().getFields().get(0);
-    DataBag db = new DefaultDataBag();
-    for (Object o : list){
-      Tuple tuple;
-      if (elementSubFieldSchema.getType() == Type.STRUCT){
-        tuple = transformToTuple((List<Object>)o, elementSubFieldSchema);
-      } else {
-        // bags always contain tuples
-        tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema));
-      }
-      db.add(tuple);
-    }
-    return db;
-  }
-
-
-  private static void validateHCatSchemaFollowsPigRules(HCatSchema tblSchema) throws PigException {
-    for(HCatFieldSchema hcatField : tblSchema.getFields()){
-      validateHcatFieldFollowsPigRules(hcatField);
-    }
-  }
-
-  private static void validateHcatFieldFollowsPigRules(HCatFieldSchema hcatField) throws PigException {
-    try {
-      Type hType = hcatField.getType();
-      switch(hType){
-      case BOOLEAN:
-        throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
-      case ARRAY:
-        validateHCatSchemaFollowsPigRules(hcatField.getArrayElementSchema());
-        break;
-      case STRUCT:
-        validateHCatSchemaFollowsPigRules(hcatField.getStructSubSchema());
-        break;
-      case MAP:
-        // key is only string
-        validateHCatSchemaFollowsPigRules(hcatField.getMapValueSchema());
-        break;
-      }
-    } catch (HCatException e) {
-      throw new PigException("Incompatible type found in hcat table schema: "+hcatField, PigHCatUtil.PIG_EXCEPTION_CODE, e);
-    }
-  }
-
-
-  public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException {
-    validateHCatSchemaFollowsPigRules(hcatTableSchema);
-  }
-
-  public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) {
-    if(p.getProperty(propName) != null){
-      config.set(propName, p.getProperty(propName));
-    }
-  }
-
-  public static void saveConfigIntoUDFProperties(Properties p, Configuration config, String propName) {
-    if(config.get(propName) != null){
-      p.setProperty(propName, config.get(propName));
+    static public Pair<String, String> getDBTableNames(String location) throws IOException {
+        // the location string will be of the form:
+        // <database name>.<table name> - parse it and
+        // communicate the information to HCatInputFormat
+
+        try {
+            return HCatUtil.getDbAndTableName(location);
+        } catch (IOException e) {
+            String locationErrMsg = "The input location in load statement " +
+                "should be of the form " +
+                "<databasename>.<table name> or <table name>. Got " + location;
+            throw new PigException(locationErrMsg, PIG_EXCEPTION_CODE);
+        }
+    }
+
+    static public String getHCatServerUri(Job job) {
+
+        return job.getConfiguration().get(HiveConf.ConfVars.METASTOREURIS.varname);
+    }
+
+    static public String getHCatServerPrincipal(Job job) {
+
+        return job.getConfiguration().get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
+    }
+
+    private static HiveMetaStoreClient getHiveMetaClient(String serverUri,
+                                                         String serverKerberosPrincipal, Class<?> clazz) throws Exception {
+        HiveConf hiveConf = new HiveConf(clazz);
+
+        if (serverUri != null) {
+            hiveConf.set("hive.metastore.local", "false");
+            hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim());
+        }
+
+        if (serverKerberosPrincipal != null) {
+            hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
+            hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal);
+        }
+
+        try {
+            return HCatUtil.getHiveClient(hiveConf);
+        } catch (Exception e) {
+            throw new Exception("Could not instantiate a HiveMetaStoreClient connecting to server uri:[" + serverUri + "]", e);
+        }
+    }
+
+
+    HCatSchema getHCatSchema(List<RequiredField> fields, String signature, Class<?> classForUDFCLookup) throws IOException {
+        if (fields == null) {
+            return null;
+        }
+
+        Properties props = UDFContext.getUDFContext().getUDFProperties(
+            classForUDFCLookup, new String[]{signature});
+        HCatSchema hcatTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA);
+
+        ArrayList<HCatFieldSchema> fcols = new ArrayList<HCatFieldSchema>();
+        for (RequiredField rf : fields) {
+            fcols.add(hcatTableSchema.getFields().get(rf.getIndex()));
+        }
+        return new HCatSchema(fcols);
+    }
+
+    public Table getTable(String location, String hcatServerUri, String hcatServerPrincipal) throws IOException {
+        Pair<String, String> loc_server = new Pair<String, String>(location, hcatServerUri);
+        Table hcatTable = hcatTableCache.get(loc_server);
+        if (hcatTable != null) {
+            return hcatTable;
+        }
+
+        Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
+        String dbName = dbTablePair.first;
+        String tableName = dbTablePair.second;
+        Table table = null;
+        HiveMetaStoreClient client = null;
+        try {
+            client = getHiveMetaClient(hcatServerUri, hcatServerPrincipal, PigHCatUtil.class);
+            table = HCatUtil.getTable(client, dbName, tableName);
+        } catch (NoSuchObjectException nsoe) {
+            throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend
+        } catch (Exception e) {
+            throw new IOException(e);
+        } finally {
+            HCatUtil.closeHiveClientQuietly(client);
+        }
+        hcatTableCache.put(loc_server, table);
+        return table;
+    }
+
+    public static ResourceSchema getResourceSchema(HCatSchema hcatSchema) throws IOException {
+
+        List<ResourceFieldSchema> rfSchemaList = new ArrayList<ResourceFieldSchema>();
+        for (HCatFieldSchema hfs : hcatSchema.getFields()) {
+            ResourceFieldSchema rfSchema;
+            rfSchema = getResourceSchemaFromFieldSchema(hfs);
+            rfSchemaList.add(rfSchema);
+        }
+        ResourceSchema rSchema = new ResourceSchema();
+        rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[0]));
+        return rSchema;
+
+    }
+
+    private static ResourceFieldSchema getResourceSchemaFromFieldSchema(HCatFieldSchema hfs)
+        throws IOException {
+        ResourceFieldSchema rfSchema;
+        // if we are dealing with a bag or tuple column - need to worry about subschema
+        if (hfs.getType() == Type.STRUCT) {
+            rfSchema = new ResourceFieldSchema()
+                .setName(hfs.getName())
+                .setDescription(hfs.getComment())
+                .setType(getPigType(hfs))
+                .setSchema(getTupleSubSchema(hfs));
+        } else if (hfs.getType() == Type.ARRAY) {
+            rfSchema = new ResourceFieldSchema()
+                .setName(hfs.getName())
+                .setDescription(hfs.getComment())
+                .setType(getPigType(hfs))
+                .setSchema(getBagSubSchema(hfs));
+        } else {
+            rfSchema = new ResourceFieldSchema()
+                .setName(hfs.getName())
+                .setDescription(hfs.getComment())
+                .setType(getPigType(hfs))
+                .setSchema(null); // no munging inner-schemas
+        }
+        return rfSchema;
+    }
+
+    protected static ResourceSchema getBagSubSchema(HCatFieldSchema hfs) throws IOException {
+        // there are two cases - array<Type> and array<struct<...>>
+        // in either case the element type of the array is represented in a
+        // tuple field schema in the bag's field schema - the second case (struct)
+        // more naturally translates to the tuple - in the first case (array<Type>)
+        // we simulate the tuple by putting the single field in a tuple
+
+        Properties props = UDFContext.getUDFContext().getClientSystemProps();
+        String innerTupleName = HCatConstants.HCAT_PIG_INNER_TUPLE_NAME_DEFAULT;
+        if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME)) {
+            innerTupleName = props.getProperty(HCatConstants.HCAT_PIG_INNER_TUPLE_NAME)
+                .replaceAll("FIELDNAME", hfs.getName());
+        }
+        String innerFieldName = HCatConstants.HCAT_PIG_INNER_FIELD_NAME_DEFAULT;
+        if (props != null && props.containsKey(HCatConstants.HCAT_PIG_INNER_FIELD_NAME)) {
+            innerFieldName = props.getProperty(HCatConstants.HCAT_PIG_INNER_FIELD_NAME)
+                .replaceAll("FIELDNAME", hfs.getName());
+        }
+
+        ResourceFieldSchema[] bagSubFieldSchemas = new ResourceFieldSchema[1];
+        bagSubFieldSchemas[0] = new ResourceFieldSchema().setName(innerTupleName)
+            .setDescription("The tuple in the bag")
+            .setType(DataType.TUPLE);
+        HCatFieldSchema arrayElementFieldSchema = hfs.getArrayElementSchema().get(0);
+        if (arrayElementFieldSchema.getType() == Type.STRUCT) {
+            bagSubFieldSchemas[0].setSchema(getTupleSubSchema(arrayElementFieldSchema));
+        } else if (arrayElementFieldSchema.getType() == Type.ARRAY) {
+            ResourceSchema s = new ResourceSchema();
+            List<ResourceFieldSchema> lrfs = Arrays.asList(getResourceSchemaFromFieldSchema(arrayElementFieldSchema));
+            s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+            bagSubFieldSchemas[0].setSchema(s);
+        } else {
+            ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1];
+            innerTupleFieldSchemas[0] = new ResourceFieldSchema().setName(innerFieldName)
+                .setDescription("The inner field in the tuple in the bag")
+                .setType(getPigType(arrayElementFieldSchema))
+                .setSchema(null); // the element type is not a tuple - so no subschema
+            bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas));
+        }
+        ResourceSchema s = new ResourceSchema().setFields(bagSubFieldSchemas);
+        return s;
+
+    }
+
+    private static ResourceSchema getTupleSubSchema(HCatFieldSchema hfs) throws IOException {
+        // for each struct subfield, create equivalent ResourceFieldSchema
+        ResourceSchema s = new ResourceSchema();
+        List<ResourceFieldSchema> lrfs = new ArrayList<ResourceFieldSchema>();
+        for (HCatFieldSchema subField : hfs.getStructSubSchema().getFields()) {
+            lrfs.add(getResourceSchemaFromFieldSchema(subField));
+        }
+        s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+        return s;
+    }
+
+    /**
+     * @param hfs the field schema of the column
+     * @return corresponding pig type
+     * @throws IOException
+     */
+    static public byte getPigType(HCatFieldSchema hfs) throws IOException {
+        return getPigType(hfs.getType());
+    }
+
+    static public byte getPigType(Type type) throws IOException {
+        String errMsg;
+
+        if (type == Type.STRING) {
+            return DataType.CHARARRAY;
+        }
+
+        if ((type == Type.INT) || (type == Type.SMALLINT) || (type == Type.TINYINT)) {
+            return DataType.INTEGER;
+        }
+
+        if (type == Type.ARRAY) {
+            return DataType.BAG;
+        }
+
+        if (type == Type.STRUCT) {
+            return DataType.TUPLE;
+        }
+
+        if (type == Type.MAP) {
+            return DataType.MAP;
+        }
+
+        if (type == Type.BIGINT) {
+            return DataType.LONG;
+        }
+
+        if (type == Type.FLOAT) {
+            return DataType.FLOAT;
+        }
+
+        if (type == Type.DOUBLE) {
+            return DataType.DOUBLE;
+        }
+
+        if (type == Type.BINARY) {
+            return DataType.BYTEARRAY;
+        }
+
+        if (type == Type.BOOLEAN) {
+            errMsg = "HCatalog column type 'BOOLEAN' is not supported in " +
+                "Pig as a column type";
+            throw new PigException(errMsg, PIG_EXCEPTION_CODE);
+        }
+
+        errMsg = "HCatalog column type '" + type.toString() + "' is not supported in Pig as a column type";
+        throw new PigException(errMsg, PIG_EXCEPTION_CODE);
+    }
+
+    public static Tuple transformToTuple(HCatRecord hr, HCatSchema hs) throws Exception {
+        if (hr == null) {
+            return null;
+        }
+        return transformToTuple(hr.getAll(), hs);
+    }
+
+    @SuppressWarnings("unchecked")
+    public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception {
+        Object result;
+        Type itemType = hfs.getType();
+        switch (itemType) {
+        case BINARY:
+            result = (o == null) ? null : new DataByteArray((byte[]) o);
+            break;
+        case STRUCT:
+            result = transformToTuple((List<Object>) o, hfs);
+            break;
+        case ARRAY:
+            result = transformToBag((List<? extends Object>) o, hfs);
+            break;
+        case MAP:
+            result = transformToPigMap((Map<String, Object>) o, hfs);
+            break;
+        default:
+            result = o;
+            break;
+        }
+        return result;
+    }
+
+    public static Tuple transformToTuple(List<? extends Object> objList, HCatFieldSchema hfs) throws Exception {
+        try {
+            return transformToTuple(objList, hfs.getStructSubSchema());
+        } catch (Exception e) {
+            if (hfs.getType() != Type.STRUCT) {
+                throw new Exception("Expected Struct type, got " + hfs.getType(), e);
+            } else {
+                throw e;
+            }
+        }
+    }
+
+    public static Tuple transformToTuple(List<? extends Object> objList, HCatSchema hs) throws Exception {
+        if (objList == null) {
+            return null;
+        }
+        Tuple t = tupFac.newTuple(objList.size());
+        List<HCatFieldSchema> subFields = hs.getFields();
+        for (int i = 0; i < subFields.size(); i++) {
+            t.set(i, extractPigObject(objList.get(i), subFields.get(i)));
+        }
+        return t;
+    }
+
+    public static Map<String, Object> transformToPigMap(Map<String, Object> map, HCatFieldSchema hfs) throws Exception {
+        if (map == null) {
+            return null;
+        }
+
+        Map<String, Object> result = new HashMap<String, Object>();
+        for (Entry<String, Object> entry : map.entrySet()) {
+            result.put(entry.getKey(), extractPigObject(entry.getValue(), hfs.getMapValueSchema().get(0)));
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    public static DataBag transformToBag(List<? extends Object> list, HCatFieldSchema hfs) throws Exception {
+        if (list == null) {
+            return null;
+        }
+
+        HCatFieldSchema elementSubFieldSchema = hfs.getArrayElementSchema().getFields().get(0);
+        DataBag db = new DefaultDataBag();
+        for (Object o : list) {
+            Tuple tuple;
+            if (elementSubFieldSchema.getType() == Type.STRUCT) {
+                tuple = transformToTuple((List<Object>) o, elementSubFieldSchema);
+            } else {
+                // bags always contain tuples
+                tuple = tupFac.newTuple(extractPigObject(o, elementSubFieldSchema));
+            }
+            db.add(tuple);
+        }
+        return db;
+    }
+
+
+    private static void validateHCatSchemaFollowsPigRules(HCatSchema tblSchema) throws PigException {
+        for (HCatFieldSchema hcatField : tblSchema.getFields()) {
+            validateHcatFieldFollowsPigRules(hcatField);
+        }
+    }
+
+    private static void validateHcatFieldFollowsPigRules(HCatFieldSchema hcatField) throws PigException {
+        try {
+            Type hType = hcatField.getType();
+            switch (hType) {
+            case BOOLEAN:
+                throw new PigException("Incompatible type found in hcat table schema: " + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE);
+            case ARRAY:
+                validateHCatSchemaFollowsPigRules(hcatField.getArrayElementSchema());
+                break;
+            case STRUCT:
+                validateHCatSchemaFollowsPigRules(hcatField.getStructSubSchema());
+                break;
+            case MAP:
+                // key is only string
+                validateHCatSchemaFollowsPigRules(hcatField.getMapValueSchema());
+                break;
+            }
+        } catch (HCatException e) {
+            throw new PigException("Incompatible type found in hcat table schema: " + hcatField, PigHCatUtil.PIG_EXCEPTION_CODE, e);
+        }
+    }
+
+
+    public static void validateHCatTableSchemaFollowsPigRules(HCatSchema hcatTableSchema) throws IOException {
+        validateHCatSchemaFollowsPigRules(hcatTableSchema);
+    }
+
+    public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) {
+        if (p.getProperty(propName) != null) {
+            config.set(propName, p.getProperty(propName));
+        }
+    }
+
+    public static void saveConfigIntoUDFProperties(Properties p, Configuration config, String propName) {
+        if (config.get(propName) != null) {
+            p.setProperty(propName, config.get(propName));
+        }
     }
-  }
 
 }

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java Mon Sep 10 23:28:55 2012
@@ -40,156 +40,156 @@ import org.apache.pig.data.Tuple;
 /**
  * based on {@link org.apache.pig.builtin.PigStorage}
  */
-public class LoadFuncBasedInputFormat extends InputFormat<BytesWritable,Tuple> {
+public class LoadFuncBasedInputFormat extends InputFormat<BytesWritable, Tuple> {
 
-  private final LoadFunc loadFunc;
-  private static ResourceFieldSchema[] fields;
+    private final LoadFunc loadFunc;
+    private static ResourceFieldSchema[] fields;
 
-  public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema, String location, Configuration conf) throws IOException {
+    public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema, String location, Configuration conf) throws IOException {
 
-    this.loadFunc = loadFunc;
-    fields = dataSchema.getFields();
-    
-    // Simulate the frontend call sequence for LoadFunc, in case LoadFunc need to store something into UDFContext (as JsonLoader does)
-    if (loadFunc instanceof LoadMetadata) {
-        ((LoadMetadata)loadFunc).getSchema(location, new Job(conf));
-    }
-  }
+        this.loadFunc = loadFunc;
+        fields = dataSchema.getFields();
 
-  @Override
-  public RecordReader<BytesWritable, Tuple> createRecordReader(
-      InputSplit split, TaskAttemptContext taskContext) throws IOException,
-      InterruptedException {
-    RecordReader<BytesWritable,Tuple> reader = loadFunc.getInputFormat().createRecordReader(split, taskContext);
-    return new LoadFuncBasedRecordReader(reader, loadFunc);
-  }
-
-  @Override
-  public List<InputSplit> getSplits(JobContext jobContext) throws IOException,
-  InterruptedException {
-    try {
-      InputFormat<BytesWritable,Tuple> inpFormat = loadFunc.getInputFormat();
-      return inpFormat.getSplits(jobContext);
+        // Simulate the frontend call sequence for LoadFunc, in case LoadFunc need to store something into UDFContext (as JsonLoader does)
+        if (loadFunc instanceof LoadMetadata) {
+            ((LoadMetadata) loadFunc).getSchema(location, new Job(conf));
+        }
+    }
 
-    } catch (InterruptedException    e) {
-      throw new IOException(e);
+    @Override
+    public RecordReader<BytesWritable, Tuple> createRecordReader(
+        InputSplit split, TaskAttemptContext taskContext) throws IOException,
+        InterruptedException {
+        RecordReader<BytesWritable, Tuple> reader = loadFunc.getInputFormat().createRecordReader(split, taskContext);
+        return new LoadFuncBasedRecordReader(reader, loadFunc);
     }
-  }
 
-  static class LoadFuncBasedRecordReader extends RecordReader<BytesWritable, Tuple> {
+    @Override
+    public List<InputSplit> getSplits(JobContext jobContext) throws IOException,
+        InterruptedException {
+        try {
+            InputFormat<BytesWritable, Tuple> inpFormat = loadFunc.getInputFormat();
+            return inpFormat.getSplits(jobContext);
+
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
 
-    private Tuple tupleFromDisk;
-    private final RecordReader<BytesWritable,Tuple> reader;
-    private final LoadFunc loadFunc;
-    private final LoadCaster caster;
+    static class LoadFuncBasedRecordReader extends RecordReader<BytesWritable, Tuple> {
 
-     /**
-      * @param reader
-      * @param loadFunc
-      * @throws IOException
-      */
-     public LoadFuncBasedRecordReader(RecordReader<BytesWritable,Tuple> reader, LoadFunc loadFunc) throws IOException {
-       this.reader = reader;
-       this.loadFunc = loadFunc;
-       this.caster = loadFunc.getLoadCaster();
-     }
-
-     @Override
-     public void close() throws IOException {
-       reader.close();
-     }
-
-     @Override
-     public BytesWritable getCurrentKey() throws IOException,
-     InterruptedException {
-       return null;
-     }
-
-     @Override
-     public Tuple getCurrentValue() throws IOException, InterruptedException {
-
-       for(int i = 0; i < tupleFromDisk.size(); i++) {
-
-         Object data = tupleFromDisk.get(i);
-         
-         // We will do conversion for bytes only for now
-         if (data instanceof DataByteArray) {
-         
-             DataByteArray dba = (DataByteArray) data;
-    
-             if(dba == null) {
-               // PigStorage will insert nulls for empty fields.
-              tupleFromDisk.set(i, null);
-              continue;
+        private Tuple tupleFromDisk;
+        private final RecordReader<BytesWritable, Tuple> reader;
+        private final LoadFunc loadFunc;
+        private final LoadCaster caster;
+
+        /**
+         * @param reader
+         * @param loadFunc
+         * @throws IOException
+         */
+        public LoadFuncBasedRecordReader(RecordReader<BytesWritable, Tuple> reader, LoadFunc loadFunc) throws IOException {
+            this.reader = reader;
+            this.loadFunc = loadFunc;
+            this.caster = loadFunc.getLoadCaster();
+        }
+
+        @Override
+        public void close() throws IOException {
+            reader.close();
+        }
+
+        @Override
+        public BytesWritable getCurrentKey() throws IOException,
+            InterruptedException {
+            return null;
+        }
+
+        @Override
+        public Tuple getCurrentValue() throws IOException, InterruptedException {
+
+            for (int i = 0; i < tupleFromDisk.size(); i++) {
+
+                Object data = tupleFromDisk.get(i);
+
+                // We will do conversion for bytes only for now
+                if (data instanceof DataByteArray) {
+
+                    DataByteArray dba = (DataByteArray) data;
+
+                    if (dba == null) {
+                        // PigStorage will insert nulls for empty fields.
+                        tupleFromDisk.set(i, null);
+                        continue;
+                    }
+
+                    switch (fields[i].getType()) {
+
+                    case DataType.CHARARRAY:
+                        tupleFromDisk.set(i, caster.bytesToCharArray(dba.get()));
+                        break;
+
+                    case DataType.INTEGER:
+                        tupleFromDisk.set(i, caster.bytesToInteger(dba.get()));
+                        break;
+
+                    case DataType.FLOAT:
+                        tupleFromDisk.set(i, caster.bytesToFloat(dba.get()));
+                        break;
+
+                    case DataType.LONG:
+                        tupleFromDisk.set(i, caster.bytesToLong(dba.get()));
+                        break;
+
+                    case DataType.DOUBLE:
+                        tupleFromDisk.set(i, caster.bytesToDouble(dba.get()));
+                        break;
+
+                    case DataType.MAP:
+                        tupleFromDisk.set(i, caster.bytesToMap(dba.get()));
+                        break;
+
+                    case DataType.BAG:
+                        tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i]));
+                        break;
+
+                    case DataType.TUPLE:
+                        tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i]));
+                        break;
+
+                    default:
+                        throw new IOException("Unknown Pig type in data: " + fields[i].getType());
+                    }
+                }
             }
-    
-             switch(fields[i].getType()) {
-    
-             case DataType.CHARARRAY:
-               tupleFromDisk.set(i, caster.bytesToCharArray(dba.get()));
-               break;
-    
-             case DataType.INTEGER:
-               tupleFromDisk.set(i, caster.bytesToInteger(dba.get()));
-               break;
-    
-             case DataType.FLOAT:
-               tupleFromDisk.set(i, caster.bytesToFloat(dba.get()));
-               break;
-    
-             case DataType.LONG:
-               tupleFromDisk.set(i, caster.bytesToLong(dba.get()));
-               break;
-    
-             case DataType.DOUBLE:
-               tupleFromDisk.set(i, caster.bytesToDouble(dba.get()));
-               break;
-    
-             case DataType.MAP:
-               tupleFromDisk.set(i, caster.bytesToMap(dba.get()));
-               break;
-    
-             case DataType.BAG:
-               tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i]));
-               break;
-    
-             case DataType.TUPLE:
-               tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i]));
-               break;
-    
-             default:
-               throw new IOException("Unknown Pig type in data: "+fields[i].getType());
-             }
-           }
-       }
-
-       return tupleFromDisk;
-     }
-
-
-     @Override
-     public void initialize(InputSplit split, TaskAttemptContext ctx)
-     throws IOException, InterruptedException {
-
-       reader.initialize(split, ctx);
-       loadFunc.prepareToRead(reader, null);
-     }
-
-     @Override
-     public boolean nextKeyValue() throws IOException, InterruptedException {
-
-       // even if we don't need any data from disk, we will need to call
-       // getNext() on pigStorage() so we know how many rows to emit in our
-       // final output - getNext() will eventually return null when it has
-       // read all disk data and we will know to stop emitting final output
-       tupleFromDisk = loadFunc.getNext();
-       return tupleFromDisk != null;
-     }
-
-     @Override
-     public float getProgress() throws IOException, InterruptedException {
-       return 0;
-     }
 
-  }
+            return tupleFromDisk;
+        }
+
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext ctx)
+            throws IOException, InterruptedException {
+
+            reader.initialize(split, ctx);
+            loadFunc.prepareToRead(reader, null);
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+
+            // even if we don't need any data from disk, we will need to call
+            // getNext() on pigStorage() so we know how many rows to emit in our
+            // final output - getNext() will eventually return null when it has
+            // read all disk data and we will know to stop emitting final output
+            tupleFromDisk = loadFunc.getNext();
+            return tupleFromDisk != null;
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return 0;
+        }
+
+    }
 }

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java Mon Sep 10 23:28:55 2012
@@ -38,82 +38,84 @@ import org.apache.pig.StoreMetadata;
 import org.apache.pig.data.Tuple;
 
 public class StoreFuncBasedOutputFormat extends
-        OutputFormat<BytesWritable, Tuple> {
+    OutputFormat<BytesWritable, Tuple> {
 
     private final StoreFuncInterface storeFunc;
-    
+
     public StoreFuncBasedOutputFormat(StoreFuncInterface storeFunc) {
 
         this.storeFunc = storeFunc;
     }
-    
+
     @Override
     public void checkOutputSpecs(JobContext jobContext) throws IOException,
-            InterruptedException {
-        OutputFormat<BytesWritable,Tuple> outputFormat =  storeFunc.getOutputFormat();
+        InterruptedException {
+        OutputFormat<BytesWritable, Tuple> outputFormat = storeFunc.getOutputFormat();
         outputFormat.checkOutputSpecs(jobContext);
     }
 
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
-            throws IOException, InterruptedException {
+        throws IOException, InterruptedException {
         String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
-        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(serializedJobInfo);
+        OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo);
         ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
         String location = outputJobInfo.getLocation();
-        OutputFormat<BytesWritable,Tuple> outputFormat =  storeFunc.getOutputFormat();
+        OutputFormat<BytesWritable, Tuple> outputFormat = storeFunc.getOutputFormat();
         return new StoreFuncBasedOutputCommitter(storeFunc, outputFormat.getOutputCommitter(ctx), location, rs);
     }
 
     @Override
     public RecordWriter<BytesWritable, Tuple> getRecordWriter(
-            TaskAttemptContext ctx) throws IOException, InterruptedException {
-        RecordWriter<BytesWritable,Tuple> writer = storeFunc.getOutputFormat().getRecordWriter(ctx);
+        TaskAttemptContext ctx) throws IOException, InterruptedException {
+        RecordWriter<BytesWritable, Tuple> writer = storeFunc.getOutputFormat().getRecordWriter(ctx);
         String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
-        OutputJobInfo outputJobInfo = (OutputJobInfo)HCatUtil.deserialize(serializedJobInfo);
+        OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo);
         ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
         String location = outputJobInfo.getLocation();
         return new StoreFuncBasedRecordWriter(writer, storeFunc, location, rs);
     }
-    
+
     static class StoreFuncBasedRecordWriter extends RecordWriter<BytesWritable, Tuple> {
-        private final RecordWriter<BytesWritable,Tuple> writer;
+        private final RecordWriter<BytesWritable, Tuple> writer;
         private final StoreFuncInterface storeFunc;
         private final ResourceSchema schema;
         private final String location;
-        
-        public StoreFuncBasedRecordWriter(RecordWriter<BytesWritable,Tuple> writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException {
+
+        public StoreFuncBasedRecordWriter(RecordWriter<BytesWritable, Tuple> writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException {
             this.writer = writer;
             this.storeFunc = sf;
             this.schema = rs;
             this.location = location;
             storeFunc.prepareToWrite(writer);
         }
-        
+
         @Override
         public void close(TaskAttemptContext ctx) throws IOException,
-                InterruptedException {
+            InterruptedException {
             writer.close(ctx);
         }
 
         @Override
         public void write(BytesWritable key, Tuple value) throws IOException,
-                InterruptedException {
+            InterruptedException {
             storeFunc.putNext(value);
         }
     }
-    
+
     static class StoreFuncBasedOutputCommitter extends OutputCommitter {
         StoreFuncInterface sf;
         OutputCommitter wrappedOutputCommitter;
         String location;
         ResourceSchema rs;
+
         public StoreFuncBasedOutputCommitter(StoreFuncInterface sf, OutputCommitter outputCommitter, String location, ResourceSchema rs) {
             this.sf = sf;
             this.wrappedOutputCommitter = outputCommitter;
             this.location = location;
             this.rs = rs;
         }
+
         @Override
         public void abortTask(TaskAttemptContext context) throws IOException {
             wrappedOutputCommitter.abortTask(context);
@@ -126,7 +128,7 @@ public class StoreFuncBasedOutputFormat 
 
         @Override
         public boolean needsTaskCommit(TaskAttemptContext context)
-                throws IOException {
+            throws IOException {
             return wrappedOutputCommitter.needsTaskCommit(context);
         }
 
@@ -139,28 +141,28 @@ public class StoreFuncBasedOutputFormat 
         public void setupTask(TaskAttemptContext context) throws IOException {
             wrappedOutputCommitter.setupTask(context);
         }
-        
+
         public void commitJob(JobContext context) throws IOException {
             wrappedOutputCommitter.commitJob(context);
             if (sf instanceof StoreMetadata) {
                 if (rs != null) {
                     ((StoreMetadata) sf).storeSchema(
-                            rs, location, new Job(context.getConfiguration()) );
+                        rs, location, new Job(context.getConfiguration()));
                 }
             }
         }
-        
+
         @Override
         public void cleanupJob(JobContext context) throws IOException {
             wrappedOutputCommitter.cleanupJob(context);
             if (sf instanceof StoreMetadata) {
                 if (rs != null) {
                     ((StoreMetadata) sf).storeSchema(
-                            rs, location, new Job(context.getConfiguration()) );
+                        rs, location, new Job(context.getConfiguration()));
                 }
             }
         }
-        
+
         public void abortJob(JobContext context, JobStatus.State state) throws IOException {
             wrappedOutputCommitter.abortJob(context, state);
         }

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MockLoader.java Mon Sep 10 23:28:55 2012
@@ -40,138 +40,140 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.Tuple;
 
 public class MockLoader extends LoadFunc {
-  private static final class MockRecordReader extends RecordReader<Object, Object> {
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public Object getCurrentKey() throws IOException, InterruptedException {
-      return "mockKey";
-    }
-
-    @Override
-    public Object getCurrentValue() throws IOException, InterruptedException {
-      return "mockValue";
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return 0.5f;
+    private static final class MockRecordReader extends RecordReader<Object, Object> {
+        @Override
+        public void close() throws IOException {
+        }
+
+        @Override
+        public Object getCurrentKey() throws IOException, InterruptedException {
+            return "mockKey";
+        }
+
+        @Override
+        public Object getCurrentValue() throws IOException, InterruptedException {
+            return "mockValue";
+        }
+
+        @Override
+        public float getProgress() throws IOException, InterruptedException {
+            return 0.5f;
+        }
+
+        @Override
+        public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException,
+            InterruptedException {
+        }
+
+        @Override
+        public boolean nextKeyValue() throws IOException, InterruptedException {
+            return true;
+        }
+    }
+
+    private static final class MockInputSplit extends InputSplit implements Writable {
+        private String location;
+
+        public MockInputSplit() {
+        }
+
+        public MockInputSplit(String location) {
+            this.location = location;
+        }
+
+        @Override
+        public String[] getLocations() throws IOException, InterruptedException {
+            return new String[]{location};
+        }
+
+        @Override
+        public long getLength() throws IOException, InterruptedException {
+            return 10000000;
+        }
+
+        @Override
+        public boolean equals(Object arg0) {
+            return arg0 == this;
+        }
+
+        @Override
+        public int hashCode() {
+            return location.hashCode();
+        }
+
+        @Override
+        public void readFields(DataInput arg0) throws IOException {
+            location = arg0.readUTF();
+        }
+
+        @Override
+        public void write(DataOutput arg0) throws IOException {
+            arg0.writeUTF(location);
+        }
+    }
+
+    private static final class MockInputFormat extends InputFormat {
+
+        private final String location;
+
+        public MockInputFormat(String location) {
+            this.location = location;
+        }
+
+        @Override
+        public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
+            throws IOException, InterruptedException {
+            return new MockRecordReader();
+        }
+
+        @Override
+        public List getSplits(JobContext arg0) throws IOException, InterruptedException {
+            return Arrays.asList(new MockInputSplit(location));
+        }
     }
 
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext arg1) throws IOException,
-        InterruptedException {
-    }
+    private static final Map<String, Iterable<Tuple>> locationToData = new HashMap<String, Iterable<Tuple>>();
 
-    @Override
-    public boolean nextKeyValue() throws IOException, InterruptedException {
-      return true;
+    public static void setData(String location, Iterable<Tuple> data) {
+        locationToData.put(location, data);
     }
-  }
 
-  private static final class MockInputSplit extends InputSplit implements Writable  {
     private String location;
-    public MockInputSplit() {
-    }
-    public MockInputSplit(String location) {
-      this.location = location;
-    }
 
-    @Override
-    public String[] getLocations() throws IOException, InterruptedException {
-      return new String[] { location };
-    }
+    private Iterator<Tuple> data;
 
     @Override
-    public long getLength() throws IOException, InterruptedException {
-      return 10000000;
+    public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+        return location;
     }
 
     @Override
-    public boolean equals(Object arg0) {
-      return arg0==this;
+    public void setLocation(String location, Job job) throws IOException {
+        this.location = location;
+        if (location == null) {
+            throw new IOException("null location passed to MockLoader");
+        }
+        this.data = locationToData.get(location).iterator();
+        if (this.data == null) {
+            throw new IOException("No data configured for location: " + location);
+        }
     }
 
     @Override
-    public int hashCode() {
-      return location.hashCode();
+    public Tuple getNext() throws IOException {
+        if (data == null) {
+            throw new IOException("data was not correctly initialized in MockLoader");
+        }
+        return data.hasNext() ? data.next() : null;
     }
 
     @Override
-    public void readFields(DataInput arg0) throws IOException {
-      location = arg0.readUTF();
+    public InputFormat getInputFormat() throws IOException {
+        return new MockInputFormat(location);
     }
 
     @Override
-    public void write(DataOutput arg0) throws IOException {
-      arg0.writeUTF(location);
+    public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException {
     }
-  }
-
-  private static final class MockInputFormat extends InputFormat {
-
-    private final String location;
-
-    public MockInputFormat(String location) {
-      this.location = location;
-    }
-
-    @Override
-    public RecordReader createRecordReader(InputSplit arg0, TaskAttemptContext arg1)
-        throws IOException, InterruptedException {
-      return new MockRecordReader();
-    }
-
-    @Override
-    public List getSplits(JobContext arg0) throws IOException, InterruptedException {
-      return Arrays.asList(new MockInputSplit(location));
-    }
-  }
-
-  private static final Map<String, Iterable<Tuple>> locationToData = new HashMap<String, Iterable<Tuple>>();
-
-  public static void setData(String location, Iterable<Tuple> data) {
-    locationToData.put(location, data);
-  }
-
-  private String location;
-
-  private Iterator<Tuple> data;
-
-  @Override
-  public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
-    return location;
-  }
-
-  @Override
-  public void setLocation(String location, Job job) throws IOException {
-    this.location = location;
-    if (location == null) {
-      throw new IOException("null location passed to MockLoader");
-    }
-    this.data = locationToData.get(location).iterator();
-    if (this.data == null) {
-      throw new IOException("No data configured for location: "+location);
-    }
-  }
-
-  @Override
-  public Tuple getNext() throws IOException {
-    if (data == null) {
-      throw new IOException("data was not correctly initialized in MockLoader");
-    }
-    return data.hasNext() ? data.next() : null;
-  }
-
-  @Override
-  public InputFormat getInputFormat() throws IOException {
-    return new MockInputFormat(location);
-  }
-
-  @Override
-  public void prepareToRead(RecordReader arg0, PigSplit arg1) throws IOException {
-  }
 
 }

Modified: incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java?rev=1383152&r1=1383151&r2=1383152&view=diff
==============================================================================
--- incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java (original)
+++ incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/MyPigStorage.java Mon Sep 10 23:28:55 2012
@@ -24,15 +24,16 @@ import org.apache.pig.data.Tuple;
 
 public class MyPigStorage extends PigStorage {
 
-  String arg2;
-  public MyPigStorage(String arg1, String arg2) throws IOException {
-    super(arg1);
-    this.arg2 = arg2;
-  }
-  
-  @Override
-  public void putNext(Tuple t) throws IOException {
-      t.append(arg2);
-      super.putNext(t);
-  }
+    String arg2;
+
+    public MyPigStorage(String arg1, String arg2) throws IOException {
+        super(arg1);
+        this.arg2 = arg2;
+    }
+
+    @Override
+    public void putNext(Tuple t) throws IOException {
+        t.append(arg2);
+        super.putNext(t);
+    }
 }



Mime
View raw message