incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1091509 [5/8] - in /incubator/hcatalog/trunk: ./ bin/ ivy/ src/ src/docs/ src/docs/src/ src/docs/src/documentation/ src/docs/src/documentation/classes/ src/docs/src/documentation/conf/ src/docs/src/documentation/content/ src/docs/src/docum...
Date Tue, 12 Apr 2011 17:30:12 GMT
Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/**
+ * The Class which handles querying the metadata server using the MetaStoreClient. The list of
+ * partitions matching the partition filter is fetched from the server and the information is
+ * serialized and written into the JobContext configuration. The inputInfo is also updated with
+ * info required in the client process context.
+ */
+public class InitializeInput {
+
+  /** The prefix for keys used for storage driver arguments */
+  private static final String HCAT_KEY_PREFIX = "hcat.";
+  private static final HiveConf hiveConf = new HiveConf(HCatInputFormat.class);
+
+  private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, HCatTableInfo inputInfo) throws Exception {
+    if (inputInfo.getServerUri() != null){
+
+      hiveConf.setBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
+      hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+          inputInfo.getServerKerberosPrincipal());
+
+      hiveConf.set("hive.metastore.local", "false");
+      hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, inputInfo.getServerUri());
+    }
+
+    return new HiveMetaStoreClient(hiveConf,null);
+  }
+
+  /**
+   * Set the input to use for the Job. This queries the metadata server with the specified partition predicates,
+   * gets the matching partitions, puts the information in the configuration object.
+   * @param job the job object
+   * @param inputInfo the howl table input info
+   * @throws Exception
+   */
+  public static void setInput(Job job, HCatTableInfo inputInfo) throws Exception {
+
+    //* Create and initialize an JobInfo object
+    //* Serialize the JobInfo and save in the Job's Configuration object
+
+    HiveMetaStoreClient client = null;
+
+    try {
+      client = createHiveMetaClient(job.getConfiguration(),inputInfo);
+      Table table = client.getTable(inputInfo.getDatabaseName(), inputInfo.getTableName());
+      HCatSchema tableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
+
+      List<PartInfo> partInfoList = new ArrayList<PartInfo>();
+
+      if( table.getPartitionKeys().size() != 0 ) {
+        //Partitioned table
+        List<Partition> parts = client.listPartitionsByFilter(
+            inputInfo.getDatabaseName(), inputInfo.getTableName(),
+            inputInfo.getFilter(), (short) -1);
+
+        // Default to 100,000 partitions if hive.metastore.maxpartition is not defined
+        int maxPart = hiveConf.getInt("hcat.metastore.maxpartitions", 100000);
+        if (parts != null && parts.size() > maxPart) {
+          throw new HCatException(ErrorType.ERROR_EXCEED_MAXPART, "total number of partitions is " + parts.size());
+        }
+
+        // populate partition info
+        for (Partition ptn : parts){
+          PartInfo partInfo = extractPartInfo(ptn.getSd(),ptn.getParameters());
+          partInfo.setPartitionValues(createPtnKeyValueMap(table,ptn));
+          partInfoList.add(partInfo);
+        }
+
+      }else{
+        //Non partitioned table
+        PartInfo partInfo = extractPartInfo(table.getSd(),table.getParameters());
+        partInfo.setPartitionValues(new HashMap<String,String>());
+        partInfoList.add(partInfo);
+      }
+
+      JobInfo howlJobInfo = new JobInfo(inputInfo, tableSchema, partInfoList);
+      inputInfo.setJobInfo(howlJobInfo);
+
+      job.getConfiguration().set(
+          HCatConstants.HCAT_KEY_JOB_INFO,
+          HCatUtil.serialize(howlJobInfo)
+      );
+    } finally {
+      if (client != null ) {
+        client.close();
+      }
+    }
+  }
+
+  private static Map<String, String> createPtnKeyValueMap(Table table, Partition ptn) throws IOException{
+    List<String> values = ptn.getValues();
+    if( values.size() != table.getPartitionKeys().size() ) {
+      throw new IOException("Partition values in partition inconsistent with table definition, table "
+          + table.getTableName() + " has "
+          + table.getPartitionKeys().size()
+          + " partition keys, partition has " + values.size() + "partition values" );
+    }
+
+    Map<String,String> ptnKeyValues = new HashMap<String,String>();
+
+    int i = 0;
+    for(FieldSchema schema : table.getPartitionKeys()) {
+      // CONCERN : the way this mapping goes, the order *needs* to be preserved for table.getPartitionKeys() and ptn.getValues()
+      ptnKeyValues.put(schema.getName().toLowerCase(), values.get(i));
+      i++;
+    }
+
+    return ptnKeyValues;
+  }
+
+  private static PartInfo extractPartInfo(StorageDescriptor sd, Map<String,String> parameters) throws IOException{
+    HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd);
+    String inputStorageDriverClass = null;
+    Properties howlProperties = new Properties();
+    if (parameters.containsKey(HCatConstants.HCAT_ISD_CLASS)){
+      inputStorageDriverClass = parameters.get(HCatConstants.HCAT_ISD_CLASS);
+    }else{
+      // attempt to default to RCFile if the storage descriptor says it's an RCFile
+      if ((sd.getInputFormat() != null) && (sd.getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS))){
+        inputStorageDriverClass = HCatConstants.HCAT_RCFILE_ISD_CLASS;
+      }else{
+        throw new IOException("No input storage driver classname found, cannot read partition");
+      }
+    }
+    for (String key : parameters.keySet()){
+      if (key.startsWith(HCAT_KEY_PREFIX)){
+        howlProperties.put(key, parameters.get(key));
+      }
+    }
+    return new PartInfo(schema,inputStorageDriverClass,  sd.getLocation(), howlProperties);
+  }
+
+
+
+  static StorerInfo extractStorerInfo(StorageDescriptor sd, Map<String, String> properties) throws IOException {
+    String inputSDClass, outputSDClass;
+
+    if (properties.containsKey(HCatConstants.HCAT_ISD_CLASS)){
+      inputSDClass = properties.get(HCatConstants.HCAT_ISD_CLASS);
+    }else{
+      // attempt to default to RCFile if the storage descriptor says it's an RCFile
+      if ((sd.getInputFormat() != null) && (sd.getInputFormat().equals(HCatConstants.HIVE_RCFILE_IF_CLASS))){
+        inputSDClass = HCatConstants.HCAT_RCFILE_ISD_CLASS;
+      }else{
+        throw new IOException("No input storage driver classname found for table, cannot write partition");
+      }
+    }
+
+    if (properties.containsKey(HCatConstants.HCAT_OSD_CLASS)){
+      outputSDClass = properties.get(HCatConstants.HCAT_OSD_CLASS);
+    }else{
+      // attempt to default to RCFile if the storage descriptor says it's an RCFile
+      if ((sd.getOutputFormat() != null) && (sd.getOutputFormat().equals(HCatConstants.HIVE_RCFILE_OF_CLASS))){
+        outputSDClass = HCatConstants.HCAT_RCFILE_OSD_CLASS;
+      }else{
+        throw new IOException("No output storage driver classname found for table, cannot write partition");
+      }
+    }
+
+    Properties howlProperties = new Properties();
+    for (String key : properties.keySet()){
+      if (key.startsWith(HCAT_KEY_PREFIX)){
+        howlProperties.put(key, properties.get(key));
+      }
+    }
+
+    return new StorerInfo(inputSDClass, outputSDClass, howlProperties);
+  }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/JobInfo.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.List;
+
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The class used to serialize and store the information read from the metadata server */
+public class JobInfo implements Serializable{
+
+    /** The serialization version */
+    private static final long serialVersionUID = 1L;
+
+    /** The db and table names. */
+    private final String dbName;
+    private final String tableName;
+
+    /** The table schema. */
+    private final HCatSchema tableSchema;
+
+    /** The list of partitions matching the filter. */
+    private final List<PartInfo> partitions;
+
+    /**
+     * Instantiates a new howl job info.
+     * @param tableName the table name
+     * @param tableSchema the table schema
+     * @param partitions the partitions
+     */
+    public JobInfo(HCatTableInfo howlTableInfo, HCatSchema tableSchema,
+            List<PartInfo> partitions) {
+        this.tableName = howlTableInfo.getTableName();
+        this.dbName = howlTableInfo.getDatabaseName();
+        this.tableSchema = tableSchema;
+        this.partitions = partitions;
+    }
+
+    /**
+     * Gets the value of dbName
+     * @return the dbName
+     */
+    public String getDatabaseName() {
+        return tableName;
+    }
+
+    /**
+     * Gets the value of tableName
+     * @return the tableName
+     */
+    public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * Gets the value of tableSchema
+     * @return the tableSchema
+     */
+    public HCatSchema getTableSchema() {
+        return tableSchema;
+    }
+
+    /**
+     * Gets the value of partitions
+     * @return the partitions
+     */
+    public List<PartInfo> getPartitions() {
+        return partitions;
+    }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The class used to serialize and store the output related information  */
+class OutputJobInfo implements Serializable {
+
+    /** The serialization version. */
+    private static final long serialVersionUID = 1L;
+
+    /** The table info provided by user. */
+    private final HCatTableInfo tableInfo;
+
+    /** The output schema. This is given to us by user.  This wont contain any
+     * partition columns ,even if user has specified them.
+     * */
+    private HCatSchema outputSchema;
+
+    /** This is table schema, retrieved from metastore. */
+    private final HCatSchema tableSchema;
+
+    /** The storer info */
+    private final StorerInfo storerInfo;
+
+    /** The location of the partition being written */
+    private final String location;
+
+    /** The table being written to */
+    private final Table table;
+
+    /** This is a list of partition columns which will be deleted from data, if
+     * data contains partition columns.*/
+
+    private List<Integer> posOfPartCols;
+
+    /**
+     * @return the posOfPartCols
+     */
+    protected List<Integer> getPosOfPartCols() {
+      return posOfPartCols;
+    }
+
+    /**
+     * @param posOfPartCols the posOfPartCols to set
+     */
+    protected void setPosOfPartCols(List<Integer> posOfPartCols) {
+      // sorting the list in the descending order so that deletes happen back-to-front
+      Collections.sort(posOfPartCols, new Comparator<Integer> () {
+        @Override
+        public int compare(Integer earlier, Integer later) {
+          return (earlier > later) ? -1 : ((earlier == later) ? 0 : 1);
+        }
+      });
+      this.posOfPartCols = posOfPartCols;
+    }
+
+    public OutputJobInfo(HCatTableInfo tableInfo, HCatSchema outputSchema, HCatSchema tableSchema,
+        StorerInfo storerInfo, String location, Table table) {
+      super();
+      this.tableInfo = tableInfo;
+      this.outputSchema = outputSchema;
+      this.tableSchema = tableSchema;
+      this.storerInfo = storerInfo;
+      this.location = location;
+      this.table = table;
+    }
+
+    /**
+     * @return the tableInfo
+     */
+    public HCatTableInfo getTableInfo() {
+      return tableInfo;
+    }
+
+    /**
+     * @return the outputSchema
+     */
+    public HCatSchema getOutputSchema() {
+      return outputSchema;
+    }
+
+    /**
+     * @param schema the outputSchema to set
+     */
+    public void setOutputSchema(HCatSchema schema) {
+      this.outputSchema = schema;
+    }
+
+    /**
+     * @return the tableSchema
+     */
+    public HCatSchema getTableSchema() {
+      return tableSchema;
+    }
+
+    /**
+     * @return the storerInfo
+     */
+    public StorerInfo getStorerInfo() {
+      return storerInfo;
+    }
+
+    /**
+     * @return the location
+     */
+    public String getLocation() {
+      return location;
+    }
+
+    /**
+     * Gets the value of table
+     * @return the table
+     */
+    public Table getTable() {
+      return table;
+    }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/PartInfo.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+/** The Class used to serialize the partition information read from the metadata server that maps to a partition */
+public class PartInfo implements Serializable {
+
+  /** The serialization version */
+  private static final long serialVersionUID = 1L;
+
+  /** The partition schema. */
+  private final HCatSchema partitionSchema;
+
+  /** The information about which input storage driver to use */
+  private final String inputStorageDriverClass;
+
+  /** Howl-specific properties set at the partition */
+  private final Properties howlProperties;
+
+  /** The data location. */
+  private final String location;
+
+  /** The map of partition key names and their values. */
+  private Map<String,String> partitionValues;
+
+  /**
+   * Instantiates a new howl partition info.
+   * @param partitionSchema the partition schema
+   * @param inputStorageDriverClass the input storage driver class name
+   * @param location the location
+   * @param howlProperties howl-specific properties at the partition
+   */
+  public PartInfo(HCatSchema partitionSchema, String inputStorageDriverClass, String location, Properties howlProperties){
+    this.partitionSchema = partitionSchema;
+    this.inputStorageDriverClass = inputStorageDriverClass;
+    this.location = location;
+    this.howlProperties = howlProperties;
+  }
+
+  /**
+   * Gets the value of partitionSchema.
+   * @return the partitionSchema
+   */
+  public HCatSchema getPartitionSchema() {
+    return partitionSchema;
+  }
+
+
+  /**
+   * Gets the value of input storage driver class name.
+   * @return the input storage driver class name
+   */
+  public String getInputStorageDriverClass() {
+    return inputStorageDriverClass;
+  }
+
+
+  /**
+   * Gets the value of howlProperties.
+   * @return the howlProperties
+   */
+  public Properties getInputStorageDriverProperties() {
+    return howlProperties;
+  }
+
+  /**
+   * Gets the value of location.
+   * @return the location
+   */
+  public String getLocation() {
+    return location;
+  }
+
+  /**
+   * Sets the partition values.
+   * @param partitionValues the new partition values
+   */
+  public void setPartitionValues(Map<String,String> partitionValues) {
+    this.partitionValues = partitionValues;
+  }
+
+  /**
+   * Gets the partition values.
+   * @return the partition values
+   */
+  public Map<String,String> getPartitionValues() {
+    return partitionValues;
+  }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/StorerInfo.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hcatalog.mapreduce;
+
+import java.io.Serializable;
+import java.util.Properties;
+
+/** Info about the storer to use for writing the data */
+class StorerInfo implements Serializable {
+
+    /** The serialization version */
+    private static final long serialVersionUID = 1L;
+
+    /** The name of the input storage driver class */
+    private String inputSDClass;
+
+    /** The name of the output storage driver class */
+    private String outputSDClass;
+
+    /** The properties for the storage driver */
+    private Properties properties;
+
+
+    /**
+     * Initialize the storage driver
+     * @param inputSDClass
+     * @param outputSDClass
+     * @param properties
+     */
+    StorerInfo(String inputSDClass, String outputSDClass, Properties properties) {
+      super();
+      this.inputSDClass = inputSDClass;
+      this.outputSDClass = outputSDClass;
+      this.properties = properties;
+    }
+
+    /**
+     * @return the inputSDClass
+     */
+    public String getInputSDClass() {
+      return inputSDClass;
+    }
+
+    /**
+     * @param inputSDClass the inputSDClass to set
+     */
+    public void setInputSDClass(String inputSDClass) {
+      this.inputSDClass = inputSDClass;
+    }
+
+    /**
+     * @return the outputSDClass
+     */
+    public String getOutputSDClass() {
+      return outputSDClass;
+    }
+
+    /**
+     * @param outputSDClass the outputSDClass to set
+     */
+    public void setOutputSDClass(String outputSDClass) {
+      this.outputSDClass = outputSDClass;
+    }
+
+    /**
+     * @return the properties
+     */
+    public Properties getProperties() {
+      return properties;
+    }
+
+    /**
+     * @param properties the properties to set
+     */
+    public void setProperties(Properties properties) {
+      this.properties = properties;
+    }
+
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/oozie/JavaAction.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.oozie;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.cli.CliDriver;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
+
+public class JavaAction {
+
+  public static void main(String[] args) throws Exception{
+
+    HiveConf conf = new HiveConf();
+    conf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
+    conf.setVar(ConfVars.SEMANTIC_ANALYZER_HOOK, HCatSemanticAnalyzer.class.getName());
+    conf.setBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL, true);
+    SessionState.start(new CliSessionState(conf));
+    new CliDriver().processLine(args[0]);
+  }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.Expression.BinaryExpression;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Pig {@link LoadFunc} to read data from Howl
+ */
+
+public class HCatLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
+
+  private static final String PRUNE_PROJECTION_INFO = "prune.projection.info";
+  private static final String PARTITION_FILTER = "partition.filter"; // for future use
+
+  private HCatInputFormat howlInputFormat = null;
+  private RecordReader<?, ?> reader;
+  private String dbName;
+  private String tableName;
+  private String howlServerUri;
+  private String signature;
+  private String partitionFilterString;
+  private final PigHCatUtil phutil = new PigHCatUtil();
+
+  HCatSchema outputSchema = null;
+
+  @Override
+  public InputFormat<?,?> getInputFormat() throws IOException {
+    if(howlInputFormat == null) {
+      howlInputFormat = new HCatInputFormat();
+    }
+    return howlInputFormat;
+  }
+
+  @Override
+  public Tuple getNext() throws IOException {
+    try {
+      HCatRecord hr =  (HCatRecord) (reader.nextKeyValue() ? reader.getCurrentValue() : null);
+      Tuple t = PigHCatUtil.transformToTuple(hr,outputSchema);
+      // TODO : we were discussing an iter interface, and also a LazyTuple
+      // change this when plans for that solidifies.
+      return t;
+    } catch (ExecException e) {
+      int errCode = 6018;
+      String errMsg = "Error while reading input";
+      throw new ExecException(errMsg, errCode,
+          PigException.REMOTE_ENVIRONMENT, e);
+    } catch (Exception eOther){
+      int errCode = 6018;
+      String errMsg = "Error converting read value to tuple";
+      throw new ExecException(errMsg, errCode,
+          PigException.REMOTE_ENVIRONMENT, eOther);
+    }
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
+    this.reader = reader;
+  }
+
+  @Override
+  public String relativeToAbsolutePath(String location, Path curDir) throws IOException {
+    return location;
+  }
+
+@Override
+  public void setLocation(String location, Job job) throws IOException {
+
+    Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
+    dbName = dbTablePair.first;
+    tableName = dbTablePair.second;
+
+    // get partitionFilterString stored in the UDFContext - it would have
+    // been stored there by an earlier call to setPartitionFilter
+    // call setInput on OwlInputFormat only in the frontend because internally
+    // it makes calls to the owl server - we don't want these to happen in
+    // the backend
+    // in the hadoop front end mapred.task.id property will not be set in
+    // the Configuration
+    if (!HCatUtil.checkJobContextIfRunningFromBackend(job)){
+
+      HCatInputFormat.setInput(job, HCatTableInfo.getInputTableInfo(
+              howlServerUri!=null ? howlServerUri :
+                  (howlServerUri = PigHCatUtil.getHowlServerUri(job)),
+              PigHCatUtil.getHowlServerPrincipal(job),
+              dbName,
+              tableName,
+              getPartitionFilterString()));
+    }
+
+    // Need to also push projections by calling setOutputSchema on
+    // OwlInputFormat - we have to get the RequiredFields information
+    // from the UdfContext, translate it to an Schema and then pass it
+    // The reason we do this here is because setLocation() is called by
+    // Pig runtime at InputFormat.getSplits() and
+    // InputFormat.createRecordReader() time - we are not sure when
+    // OwlInputFormat needs to know about pruned projections - so doing it
+    // here will ensure we communicate to OwlInputFormat about pruned
+    // projections at getSplits() and createRecordReader() time
+
+    UDFContext udfContext = UDFContext.getUDFContext();
+    Properties props = udfContext.getUDFProperties(this.getClass(),
+        new String[]{signature});
+    RequiredFieldList requiredFieldsInfo =
+      (RequiredFieldList)props.get(PRUNE_PROJECTION_INFO);
+    if(requiredFieldsInfo != null) {
+      // convert to owlschema and pass to OwlInputFormat
+      try {
+        outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(),signature,this.getClass());
+        HCatInputFormat.setOutputSchema(job, outputSchema);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    } else{
+      // else - this means pig's optimizer never invoked the pushProjection
+      // method - so we need all fields and hence we should not call the
+      // setOutputSchema on OwlInputFormat
+      if (HCatUtil.checkJobContextIfRunningFromBackend(job)){
+        try {
+          HCatSchema howlTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA);
+          outputSchema = howlTableSchema;
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+    }
+  }
+
+  @Override
+  public String[] getPartitionKeys(String location, Job job)
+  throws IOException {
+    Table table = phutil.getTable(location,
+        howlServerUri!=null?howlServerUri:PigHCatUtil.getHowlServerUri(job),
+            PigHCatUtil.getHowlServerPrincipal(job));
+    List<FieldSchema> tablePartitionKeys = table.getPartitionKeys();
+    String[] partitionKeys = new String[tablePartitionKeys.size()];
+    for(int i = 0; i < tablePartitionKeys.size(); i++) {
+      partitionKeys[i] = tablePartitionKeys.get(i).getName();
+    }
+    return partitionKeys;
+  }
+
+  @Override
+  public ResourceSchema getSchema(String location, Job job) throws IOException {
+    Table table = phutil.getTable(location,
+        howlServerUri!=null?howlServerUri:PigHCatUtil.getHowlServerUri(job),
+            PigHCatUtil.getHowlServerPrincipal(job));
+    HCatSchema howlTableSchema = HCatUtil.getTableSchemaWithPtnCols(table);
+    try {
+      PigHCatUtil.validateHowlTableSchemaFollowsPigRules(howlTableSchema);
+    } catch (IOException e){
+      throw new PigException(
+          "Table schema incompatible for reading through HowlLoader :" + e.getMessage()
+          + ";[Table schema was "+ howlTableSchema.toString() +"]"
+          ,PigHCatUtil.PIG_EXCEPTION_CODE, e);
+    }
+    storeInUDFContext(signature, HCatConstants.HCAT_TABLE_SCHEMA, howlTableSchema);
+    outputSchema = howlTableSchema;
+    return PigHCatUtil.getResourceSchema(howlTableSchema);
+  }
+
+  @Override
+  public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+    // statistics not implemented currently
+    return null;
+  }
+
+  @Override
+  public void setPartitionFilter(Expression partitionFilter) throws IOException {
+    // convert the partition filter expression into a string expected by
+    // howl and pass it in setLocation()
+
+    partitionFilterString = getHowlComparisonString(partitionFilter);
+
+    // store this in the udf context so we can get it later
+    storeInUDFContext(signature,
+        PARTITION_FILTER, partitionFilterString);
+  }
+
+  @Override
+  public List<OperatorSet> getFeatures() {
+    return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+  }
+
+  @Override
+  public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldsInfo) throws FrontendException {
+    // Store the required fields information in the UDFContext so that we
+    // can retrieve it later.
+    storeInUDFContext(signature, PRUNE_PROJECTION_INFO, requiredFieldsInfo);
+
+    // Howl will always prune columns based on what we ask of it - so the
+    // response is true
+    return new RequiredFieldResponse(true);
+  }
+
+  @Override
+  public void setUDFContextSignature(String signature) {
+    this.signature = signature;
+  }
+
+
+  // helper methods
+  private void storeInUDFContext(String signature, String key, Object value) {
+    UDFContext udfContext = UDFContext.getUDFContext();
+    Properties props = udfContext.getUDFProperties(
+        this.getClass(), new String[] {signature});
+    props.put(key, value);
+  }
+
+
+  private String getPartitionFilterString() {
+    if(partitionFilterString == null) {
+      Properties props = UDFContext.getUDFContext().getUDFProperties(
+          this.getClass(), new String[] {signature});
+      partitionFilterString = props.getProperty(PARTITION_FILTER);
+    }
+    return partitionFilterString;
+  }
+
+  private String getHowlComparisonString(Expression expr) {
+    if(expr instanceof BinaryExpression){
+      // call getOwlComparisonString on lhs and rhs, and and join the
+      // results with OpType string
+
+      // we can just use OpType.toString() on all Expression types except
+      // Equal, NotEqualt since Equal has '==' in toString() and
+      // we need '='
+      String opStr = null;
+      switch(expr.getOpType()){
+        case OP_EQ :
+          opStr = " = ";
+          break;
+        default:
+          opStr = expr.getOpType().toString();
+      }
+      BinaryExpression be = (BinaryExpression)expr;
+      return "(" + getHowlComparisonString(be.getLhs()) +
+                  opStr +
+                  getHowlComparisonString(be.getRhs()) + ")";
+    } else {
+      // should be a constant or column
+      return expr.toString();
+    }
+  }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,535 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
+import org.apache.hcatalog.mapreduce.HCatOutputCommitter;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * HowlStorer.
+ *
+ */
+
+public class HCatStorer extends StoreFunc implements StoreMetadata {
+
+  /**
+   *
+   */
+  private static final String COMPUTED_OUTPUT_SCHEMA = "howl.output.schema";
+  private final Map<String,String> partitions;
+  private Schema pigSchema;
+  private RecordWriter<WritableComparable<?>, HCatRecord> writer;
+  private HCatSchema computedSchema;
+  private static final String PIG_SCHEMA = "howl.pig.store.schema";
+  private String sign;
+
+  public HCatStorer(String partSpecs, String schema) throws ParseException, FrontendException {
+
+    partitions = new HashMap<String, String>();
+    if(partSpecs != null && !partSpecs.trim().isEmpty()){
+      String[] partKVPs = partSpecs.split(",");
+      for(String partKVP : partKVPs){
+        String[] partKV = partKVP.split("=");
+        if(partKV.length == 2) {
+          partitions.put(partKV[0].trim(), partKV[1].trim());
+        } else {
+          throw new FrontendException("Invalid partition column specification. "+partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+      }
+    }
+
+    if(schema != null) {
+      pigSchema = Utils.getSchemaFromString(schema);
+    }
+
+  }
+
+  public HCatStorer(String partSpecs) throws ParseException, FrontendException {
+    this(partSpecs, null);
+  }
+
+  public HCatStorer() throws FrontendException, ParseException{
+    this(null,null);
+  }
+
+  @Override
+  public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+
+    /*  Schema provided by user and the schema computed by Pig
+     * at the time of calling store must match.
+     */
+    Schema runtimeSchema = Schema.getPigSchema(resourceSchema);
+    if(pigSchema != null){
+      if(! Schema.equals(runtimeSchema, pigSchema, false, true) ){
+        throw new FrontendException("Schema provided in store statement doesn't match with the Schema" +
+            "returned by Pig run-time. Schema provided in HowlStorer: "+pigSchema.toString()+ " Schema received from Pig runtime: "+runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+    } else {
+      pigSchema = runtimeSchema;
+    }
+    UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA,ObjectSerializer.serialize(pigSchema));
+  }
+
+  /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing
+   * schema of the table in metastore.
+   */
+  private HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException{
+
+    List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
+    for(FieldSchema fSchema : pigSchema.getFields()){
+      byte type = fSchema.type;
+      HCatFieldSchema howlFSchema;
+
+      try {
+
+        // Find out if we need to throw away the tuple or not.
+        if(type == DataType.BAG && removeTupleFromBag(tableSchema, fSchema)){
+          List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
+          arrFields.add(getHowlFSFromPigFS(fSchema.schema.getField(0).schema.getField(0)));
+          howlFSchema = new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), null);
+      }
+      else{
+          howlFSchema = getHowlFSFromPigFS(fSchema);
+      }
+      fieldSchemas.add(howlFSchema);
+      } catch (HCatException he){
+          throw new FrontendException(he.getMessage(),PigHCatUtil.PIG_EXCEPTION_CODE,he);
+      }
+    }
+
+    return new HCatSchema(fieldSchemas);
+  }
+
+  private void validateUnNested(Schema innerSchema) throws FrontendException{
+
+    for(FieldSchema innerField : innerSchema.getFields()){
+      validateAlias(innerField.alias);
+      if(DataType.isComplex(innerField.type)) {
+        throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+    }
+  }
+
+  private boolean removeTupleFromBag(HCatSchema tableSchema, FieldSchema bagFieldSchema) throws HCatException{
+
+    String colName = bagFieldSchema.alias;
+    for(HCatFieldSchema field : tableSchema.getFields()){
+      if(colName.equalsIgnoreCase(field.getName())){
+        return (field.getArrayElementSchema().get(0).getType() == Type.STRUCT) ? false : true;
+      }
+    }
+    // Column was not found in table schema. Its a new column
+    List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
+    return (tupSchema.size() == 1 && tupSchema.get(0).schema == null) ? true : false;
+  }
+
+
+  private HCatFieldSchema getHowlFSFromPigFS(FieldSchema fSchema) throws FrontendException, HCatException{
+
+    byte type = fSchema.type;
+    switch(type){
+
+    case DataType.CHARARRAY:
+    case DataType.BIGCHARARRAY:
+      return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
+
+    case DataType.INTEGER:
+      return new HCatFieldSchema(fSchema.alias, Type.INT, null);
+
+    case DataType.LONG:
+      return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
+
+    case DataType.FLOAT:
+      return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
+
+    case DataType.DOUBLE:
+      return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
+
+    case DataType.BAG:
+      Schema bagSchema = fSchema.schema;
+      List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
+      arrFields.add(getHowlFSFromPigFS(bagSchema.getField(0)));
+      return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
+
+    case DataType.TUPLE:
+      List<String> fieldNames = new ArrayList<String>();
+      List<HCatFieldSchema> howlFSs = new ArrayList<HCatFieldSchema>();
+      for( FieldSchema fieldSchema : fSchema.schema.getFields()){
+        fieldNames.add( fieldSchema.alias);
+        howlFSs.add(getHowlFSFromPigFS(fieldSchema));
+      }
+      return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(howlFSs), "");
+
+    case DataType.MAP:{
+      // Pig's schema contain no type information about map's keys and
+      // values. So, if its a new column assume <string,string> if its existing
+      // return whatever is contained in the existing column.
+      HCatFieldSchema mapField = getTableCol(fSchema.alias, howlTblSchema);
+      HCatFieldSchema valFS;
+      List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
+
+      if(mapField != null){
+        Type mapValType = mapField.getMapValueSchema().get(0).getType();
+
+        switch(mapValType){
+        case STRING:
+        case BIGINT:
+        case INT:
+        case FLOAT:
+        case DOUBLE:
+          valFS = new HCatFieldSchema(fSchema.alias, mapValType, null);
+          break;
+        default:
+          throw new FrontendException("Only pig primitive types are supported as map value types.", PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+        valFSList.add(valFS);
+        return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
+      }
+
+      // Column not found in target table. Its a new column. Its schema is map<string,string>
+      valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
+      valFSList.add(valFS);
+      return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
+     }
+
+    default:
+      throw new FrontendException("Unsupported type: "+type+"  in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
+    }
+  }
+
+  @Override
+  public OutputFormat getOutputFormat() throws IOException {
+    return new HCatOutputFormat();
+  }
+
+  @Override
+  public void prepareToWrite(RecordWriter writer) throws IOException {
+    this.writer = writer;
+    computedSchema = (HCatSchema)ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA));
+  }
+
+  @Override
+  public void putNext(Tuple tuple) throws IOException {
+
+    List<Object> outgoing = new ArrayList<Object>(tuple.size());
+
+    int i = 0;
+    for(HCatFieldSchema fSchema : computedSchema.getFields()){
+      outgoing.add(getJavaObj(tuple.get(i++), fSchema));
+    }
+    try {
+      writer.write(null, new DefaultHCatRecord(outgoing));
+    } catch (InterruptedException e) {
+      throw new BackendException("Error while writing tuple: "+tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e);
+    }
+  }
+
+  private Object getJavaObj(Object pigObj, HCatFieldSchema howlFS) throws ExecException, HCatException{
+
+    // The real work-horse. Spend time and energy in this method if there is
+    // need to keep HowlStorer lean and go fast.
+    Type type = howlFS.getType();
+
+    switch(type){
+
+    case STRUCT:
+      // Unwrap the tuple.
+      return ((Tuple)pigObj).getAll();
+      //        Tuple innerTup = (Tuple)pigObj;
+      //
+      //      List<Object> innerList = new ArrayList<Object>(innerTup.size());
+      //      int i = 0;
+      //      for(HowlTypeInfo structFieldTypeInfo : typeInfo.getAllStructFieldTypeInfos()){
+      //        innerList.add(getJavaObj(innerTup.get(i++), structFieldTypeInfo));
+      //      }
+      //      return innerList;
+    case ARRAY:
+      // Unwrap the bag.
+      DataBag pigBag = (DataBag)pigObj;
+      HCatFieldSchema tupFS = howlFS.getArrayElementSchema().get(0);
+      boolean needTuple = tupFS.getType() == Type.STRUCT;
+      List<Object> bagContents = new ArrayList<Object>((int)pigBag.size());
+      Iterator<Tuple> bagItr = pigBag.iterator();
+
+      while(bagItr.hasNext()){
+        // If there is only one element in tuple contained in bag, we throw away the tuple.
+        bagContents.add(needTuple ? getJavaObj(bagItr.next(), tupFS) : bagItr.next().get(0));
+
+      }
+      return bagContents;
+
+      //    case MAP:
+      //     Map<String,DataByteArray> pigMap = (Map<String,DataByteArray>)pigObj;
+      //     Map<String,Long> typeMap = new HashMap<String, Long>();
+      //     for(Entry<String, DataByteArray> entry: pigMap.entrySet()){
+      //       typeMap.put(entry.getKey(), new Long(entry.getValue().toString()));
+      //     }
+      //     return typeMap;
+    default:
+      return pigObj;
+    }
+  }
+
+  @Override
+  public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+
+    // Need to necessarily override this method since default impl assumes HDFS
+    // based location string.
+    return location;
+  }
+
+  @Override
+  public void setStoreFuncUDFContextSignature(String signature) {
+    sign = signature;
+  }
+
+
+  private void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException{
+
+    // Iterate through all the elements in Pig Schema and do validations as
+    // dictated by semantics, consult HCatSchema of table when need be.
+
+    for(FieldSchema pigField : pigSchema.getFields()){
+      byte type = pigField.type;
+      String alias = pigField.alias;
+      validateAlias(alias);
+      HCatFieldSchema howlField = getTableCol(alias, tblSchema);
+
+      if(DataType.isComplex(type)){
+        switch(type){
+
+        case DataType.MAP:
+          if(howlField != null){
+            if(howlField.getMapKeyType() != Type.STRING){
+              throw new FrontendException("Key Type of map must be String "+howlField,  PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+            if(howlField.getMapValueSchema().get(0).isComplex()){
+              throw new FrontendException("Value type of map cannot be complex" + howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+          }
+          break;
+
+        case DataType.BAG:
+          // Only map is allowed as complex type in tuples inside bag.
+          for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){
+            if(innerField.type == DataType.BAG || innerField.type == DataType.TUPLE) {
+              throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+            validateAlias(innerField.alias);
+          }
+          if(howlField != null){
+            // Do the same validation for HCatSchema.
+            HCatFieldSchema arrayFieldScehma = howlField.getArrayElementSchema().get(0);
+            Type hType = arrayFieldScehma.getType();
+            if(hType == Type.STRUCT){
+              for(HCatFieldSchema structFieldInBag : arrayFieldScehma.getStructSubSchema().getFields()){
+                if(structFieldInBag.getType() == Type.STRUCT || structFieldInBag.getType() == Type.ARRAY){
+                  throw new FrontendException("Nested Complex types not allowed "+ howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+              }
+            }
+            if(hType == Type.MAP){
+              if(arrayFieldScehma.getMapKeyType() != Type.STRING){
+                throw new FrontendException("Key Type of map must be String "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+              }
+              if(arrayFieldScehma.getMapValueSchema().get(0).isComplex()){
+                throw new FrontendException("Value type of map cannot be complex "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+              }
+            }
+            if(hType == Type.ARRAY) {
+              throw new FrontendException("Arrays cannot contain array within it. "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+          }
+          break;
+
+        case DataType.TUPLE:
+          validateUnNested(pigField.schema);
+          if(howlField != null){
+            for(HCatFieldSchema structFieldSchema : howlField.getStructSubSchema().getFields()){
+              if(structFieldSchema.isComplex()){
+                throw new FrontendException("Nested Complex types are not allowed."+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+              }
+            }
+          }
+          break;
+
+        default:
+          throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+      }
+    }
+
+    for(HCatFieldSchema howlField : tblSchema.getFields()){
+
+      // We dont do type promotion/demotion.
+      Type hType = howlField.getType();
+      switch(hType){
+      case SMALLINT:
+      case TINYINT:
+      case BOOLEAN:
+        throw new FrontendException("Incompatible type found in howl table schema: "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+    }
+  }
+
+  private void validateAlias(String alias) throws FrontendException{
+    if(alias == null) {
+      throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE);
+    }
+    if(alias.matches(".*[A-Z]+.*")) {
+      throw new FrontendException("Column names should all be in lowercase. Invalid name found: "+alias, PigHCatUtil.PIG_EXCEPTION_CODE);
+    }
+  }
+
+  // Finds column by name in HCatSchema, if not found returns null.
+  private HCatFieldSchema getTableCol(String alias, HCatSchema tblSchema){
+
+    for(HCatFieldSchema howlField : tblSchema.getFields()){
+      if(howlField.getName().equalsIgnoreCase(alias)){
+        return howlField;
+      }
+    }
+    // Its a new column
+    return null;
+  }
+  HCatSchema howlTblSchema;
+
+  @Override
+  public void cleanupOnFailure(String location, Job job) throws IOException {
+    // No-op.
+  }
+
+  @Override
+  public void setStoreLocation(String location, Job job) throws IOException {
+
+    Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign});
+
+    String[] userStr = location.split("\\.");
+    HCatTableInfo tblInfo;
+    if(userStr.length == 2) {
+      tblInfo = HCatTableInfo.getOutputTableInfo(PigHCatUtil.getHowlServerUri(job),
+          PigHCatUtil.getHowlServerPrincipal(job), userStr[0],userStr[1],partitions);
+    } else {
+      tblInfo = HCatTableInfo.getOutputTableInfo(PigHCatUtil.getHowlServerUri(job),
+          PigHCatUtil.getHowlServerPrincipal(job), null,userStr[0],partitions);
+    }
+
+
+
+    Configuration config = job.getConfiguration();
+    if(!HCatUtil.checkJobContextIfRunningFromBackend(job)){
+
+      Schema schema = (Schema)ObjectSerializer.deserialize(p.getProperty(PIG_SCHEMA));
+      if(schema != null){
+        pigSchema = schema;
+      }
+      if(pigSchema == null){
+        throw new FrontendException("Schema for data cannot be determined.", PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+      try{
+        HCatOutputFormat.setOutput(job, tblInfo);
+      } catch(HCatException he) {
+          // pass the message to the user - essentially something about the table
+          // information passed to HCatOutputFormat was not right
+          throw new PigException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+      }
+      howlTblSchema = HCatOutputFormat.getTableSchema(job);
+      try{
+        doSchemaValidations(pigSchema, howlTblSchema);
+      } catch(HCatException he){
+        throw new FrontendException(he.getMessage(), PigHCatUtil.PIG_EXCEPTION_CODE, he);
+      }
+      computedSchema = convertPigSchemaToHCatSchema(pigSchema,howlTblSchema);
+      HCatOutputFormat.setSchema(job, computedSchema);
+      p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+      if(config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null){
+        p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF, config.get(HCatConstants.HCAT_KEY_HIVE_CONF));
+      }
+      if(config.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null){
+        p.setProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE,
+            config.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
+      }
+      p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema));
+
+    }else{
+      config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO));
+      if(p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null){
+        config.set(HCatConstants.HCAT_KEY_HIVE_CONF, p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF));
+      }
+      if(p.getProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null){
+        config.set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE,
+            p.getProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
+      }
+    }
+  }
+
+  @Override
+  public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException {
+    if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) {
+      //In local mode, mapreduce will not call HowlOutputCommitter.cleanupJob.
+      //Calling it from here so that the partition publish happens.
+      //This call needs to be removed after MAPREDUCE-1447 is fixed.
+      new HCatOutputCommitter(null).cleanupJob(job);
+    }
+  }
+
+  @Override
+  public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
+  }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.data.HCatArrayBag;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.DefaultDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.util.UDFContext;
+
+public class PigHCatUtil {
+
+  static final int PIG_EXCEPTION_CODE = 1115; // http://wiki.apache.org/pig/PigErrorHandlingFunctionalSpecification#Error_codes
+  private static final String DEFAULT_DB = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+
+  private final  Map<Pair<String,String>, Table> howlTableCache =
+    new HashMap<Pair<String,String>, Table>();
+
+  private static final TupleFactory tupFac = TupleFactory.getInstance();
+
+  static public Pair<String, String> getDBTableNames(String location) throws IOException {
+    // the location string will be of the form:
+    // <database name>.<table name> - parse it and
+    // communicate the information to HowlInputFormat
+
+    String[] dbTableNametokens = location.split("\\.");
+    if(dbTableNametokens.length == 1) {
+      return new Pair<String,String>(DEFAULT_DB,location);
+    }else if (dbTableNametokens.length == 2) {
+      return new Pair<String, String>(dbTableNametokens[0], dbTableNametokens[1]);
+    }else{
+      String locationErrMsg = "The input location in load statement " +
+      "should be of the form " +
+      "<databasename>.<table name> or <table name>. Got " + location;
+      throw new PigException(locationErrMsg, PIG_EXCEPTION_CODE);
+    }
+  }
+
+  static public String getHowlServerUri(Job job) {
+
+    return job.getConfiguration().get(HCatConstants.HCAT_METASTORE_URI);
+  }
+
+  static public String getHowlServerPrincipal(Job job) {
+
+    return job.getConfiguration().get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
+  }
+
+  static HiveMetaStoreClient client = null;
+
+  private static HiveMetaStoreClient createHiveMetaClient(String serverUri,
+      String serverKerberosPrincipal, Class clazz) throws Exception {
+    if (client != null){
+      return client;
+    }
+    HiveConf hiveConf = new HiveConf(clazz);
+
+    if (serverUri != null){
+
+      hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL, true);
+      hiveConf.setVar(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL, serverKerberosPrincipal);
+
+      hiveConf.set("hive.metastore.local", "false");
+      hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, serverUri.trim());
+    }
+    try {
+      client = new HiveMetaStoreClient(hiveConf,null);
+    } catch (Exception e){
+      throw new Exception("Could not instantiate a HiveMetaStoreClient connecting to server uri:["+serverUri+"]",e);
+    }
+    return client;
+  }
+
+
+  HCatSchema getHCatSchema(List<RequiredField> fields, String signature, Class<?> classForUDFCLookup) throws IOException {
+    if(fields == null) {
+      return null;
+    }
+
+    Properties props = UDFContext.getUDFContext().getUDFProperties(
+        classForUDFCLookup, new String[] {signature});
+    HCatSchema howlTableSchema = (HCatSchema) props.get(HCatConstants.HCAT_TABLE_SCHEMA);
+
+    ArrayList<HCatFieldSchema> fcols = new ArrayList<HCatFieldSchema>();
+    for(RequiredField rf: fields) {
+      fcols.add(howlTableSchema.getFields().get(rf.getIndex()));
+    }
+    return new HCatSchema(fcols);
+  }
+
+  public Table getTable(String location, String howlServerUri, String howlServerPrincipal) throws IOException{
+    Pair<String, String> loc_server = new Pair<String,String>(location, howlServerUri);
+    Table howlTable = howlTableCache.get(loc_server);
+    if(howlTable != null){
+      return howlTable;
+    }
+
+    Pair<String, String> dbTablePair = PigHCatUtil.getDBTableNames(location);
+    String dbName = dbTablePair.first;
+    String tableName = dbTablePair.second;
+    Table table = null;
+    try {
+      client = createHiveMetaClient(howlServerUri, howlServerPrincipal, PigHCatUtil.class);
+      table = client.getTable(dbName, tableName);
+    } catch (NoSuchObjectException nsoe){
+      throw new PigException("Table not found : " + nsoe.getMessage(), PIG_EXCEPTION_CODE); // prettier error messages to frontend
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    howlTableCache.put(loc_server, table);
+    return table;
+  }
+
+  public static ResourceSchema getResourceSchema(HCatSchema howlSchema) throws IOException {
+
+    List<ResourceFieldSchema> rfSchemaList = new ArrayList<ResourceFieldSchema>();
+    for (HCatFieldSchema hfs : howlSchema.getFields()){
+      ResourceFieldSchema rfSchema;
+      rfSchema = getResourceSchemaFromFieldSchema(hfs);
+      rfSchemaList.add(rfSchema);
+    }
+    ResourceSchema rSchema = new ResourceSchema();
+    rSchema.setFields(rfSchemaList.toArray(new ResourceFieldSchema[0]));
+    return rSchema;
+
+  }
+
+  private static ResourceFieldSchema getResourceSchemaFromFieldSchema(HCatFieldSchema hfs)
+      throws IOException {
+    ResourceFieldSchema rfSchema;
+    // if we are dealing with a bag or tuple column - need to worry about subschema
+    if(hfs.getType() == Type.STRUCT) {
+        rfSchema = new ResourceFieldSchema()
+          .setName(hfs.getName())
+          .setDescription(hfs.getComment())
+          .setType(getPigType( hfs))
+          .setSchema(getTupleSubSchema(hfs));
+    } else if(hfs.getType() == Type.ARRAY) {
+        rfSchema = new ResourceFieldSchema()
+          .setName(hfs.getName())
+          .setDescription(hfs.getComment())
+          .setType(getPigType( hfs))
+          .setSchema(getBagSubSchema(hfs));
+    } else {
+      rfSchema = new ResourceFieldSchema()
+          .setName(hfs.getName())
+          .setDescription(hfs.getComment())
+          .setType(getPigType( hfs))
+          .setSchema(null); // no munging inner-schemas
+    }
+    return rfSchema;
+  }
+
+  private static ResourceSchema getBagSubSchema(HCatFieldSchema hfs) throws IOException {
+    // there are two cases - array<Type> and array<struct<...>>
+    // in either case the element type of the array is represented in a
+    // tuple field schema in the bag's field schema - the second case (struct)
+    // more naturally translates to the tuple - in the first case (array<Type>)
+    // we simulate the tuple by putting the single field in a tuple
+    ResourceFieldSchema[] bagSubFieldSchemas = new ResourceFieldSchema[1];
+    bagSubFieldSchemas[0] = new ResourceFieldSchema().setName("innertuple")
+      .setDescription("The tuple in the bag")
+      .setType(DataType.TUPLE);
+    HCatFieldSchema arrayElementFieldSchema = hfs.getArrayElementSchema().get(0);
+    if(arrayElementFieldSchema.getType() == Type.STRUCT) {
+      bagSubFieldSchemas[0].setSchema(getTupleSubSchema(arrayElementFieldSchema));
+    } else {
+      ResourceFieldSchema[] innerTupleFieldSchemas = new ResourceFieldSchema[1];
+      innerTupleFieldSchemas[0] = new ResourceFieldSchema().setName("innerfield")
+        .setDescription("The inner field in the tuple in the bag")
+        .setType(getPigType(arrayElementFieldSchema))
+        .setSchema(null); // the element type is not a tuple - so no subschema
+      bagSubFieldSchemas[0].setSchema(new ResourceSchema().setFields(innerTupleFieldSchemas));
+    }
+    return new ResourceSchema().setFields(bagSubFieldSchemas);
+
+  }
+
+  private static ResourceSchema getTupleSubSchema(HCatFieldSchema hfs) throws IOException {
+    // for each struct subfield, create equivalent ResourceFieldSchema
+    ResourceSchema s = new ResourceSchema();
+    List<ResourceFieldSchema> lrfs = new ArrayList<ResourceFieldSchema>();
+    for(HCatFieldSchema subField : hfs.getStructSubSchema().getFields()) {
+      lrfs.add(getResourceSchemaFromFieldSchema(subField));
+    }
+    s.setFields(lrfs.toArray(new ResourceFieldSchema[0]));
+    return s;
+  }
+
+/**
+   * @param type owl column type
+   * @return corresponding pig type
+   * @throws IOException
+   */
+  static public byte getPigType(HCatFieldSchema hfs) throws IOException {
+    return getPigType(hfs.getType());
+  }
+
+  static public byte getPigType(Type type) throws IOException {
+    String errMsg;
+
+    if (type == Type.STRING){
+      return DataType.CHARARRAY;
+    }
+
+    if ( (type == Type.INT) || (type == Type.SMALLINT) || (type == Type.TINYINT)){
+      return DataType.INTEGER;
+    }
+
+    if (type == Type.ARRAY){
+      return DataType.BAG;
+    }
+
+    if (type == Type.STRUCT){
+      return DataType.TUPLE;
+    }
+
+    if (type == Type.MAP){
+      return DataType.MAP;
+    }
+
+    if (type == Type.BIGINT){
+      return DataType.LONG;
+    }
+
+    if (type == Type.FLOAT){
+      return DataType.FLOAT;
+    }
+
+    if (type == Type.DOUBLE){
+      return DataType.DOUBLE;
+    }
+
+    if (type == Type.BOOLEAN){
+      errMsg = "HCatalog column type 'BOOLEAN' is not supported in " +
+      "Pig as a column type";
+      throw new PigException(errMsg, PIG_EXCEPTION_CODE);
+    }
+
+    errMsg = "HCatalog column type '"+ type.toString() +"' is not supported in Pig as a column type";
+    throw new PigException(errMsg, PIG_EXCEPTION_CODE);
+  }
+
+  public static Tuple transformToTuple(HCatRecord hr, HCatSchema hs) throws Exception {
+      if (hr == null){
+        return null;
+      }
+      return transformToTuple(hr.getAll(),hs);
+    }
+
+  @SuppressWarnings("unchecked")
+public static Object extractPigObject(Object o, HCatFieldSchema hfs) throws Exception {
+      Type itemType = hfs.getType();
+      if ( ! hfs.isComplex()){
+        return o;
+      } else  if (itemType == Type.STRUCT) {
+        return transformToTuple((List<Object>)o,hfs);
+      } else  if (itemType == Type.ARRAY) {
+        return transformToBag((List<? extends Object>) o,hfs);
+      } else  if (itemType == Type.MAP) {
+        return transformToPigMap((Map<String, Object>)o,hfs);
+      }
+      return null; // never invoked.
+  }
+
+  public static Tuple transformToTuple(List<? extends Object> objList, HCatFieldSchema hfs) throws Exception {
+      try {
+          return transformToTuple(objList,hfs.getStructSubSchema());
+      } catch (Exception e){
+          if (hfs.getType() != Type.STRUCT){
+              throw new Exception("Expected Struct type, got "+hfs.getType());
+          } else {
+              throw e;
+          }
+      }
+  }
+
+  public static Tuple transformToTuple(List<? extends Object> objList, HCatSchema hs) throws Exception {
+        if (objList == null){
+          return null;
+        }
+        Tuple t = tupFac.newTuple(objList.size());
+        List<HCatFieldSchema> subFields = hs.getFields();
+        for (int i = 0; i < subFields.size(); i++){
+          t.set(i,extractPigObject(objList.get(i), subFields.get(i)));
+        }
+        return t;
+  }
+
+  public static Map<String,Object> transformToPigMap(Map<String,Object> map, HCatFieldSchema hfs) throws Exception {
+      return map;
+    }
+
+  @SuppressWarnings("unchecked")
+  public static DataBag transformToBag(List<? extends Object> list, HCatFieldSchema hfs) throws Exception {
+    if (list == null){
+      return null;
+    }
+
+    HCatFieldSchema elementSubFieldSchema = hfs.getArrayElementSchema().getFields().get(0);
+    if (elementSubFieldSchema.getType() == Type.STRUCT){
+      DataBag db = new DefaultDataBag();
+      for (Object o : list){
+        db.add(transformToTuple((List<Object>)o,elementSubFieldSchema));
+      }
+      return db;
+    } else {
+      return  new HCatArrayBag(list);
+    }
+  }
+
+
+  public static void validateHowlTableSchemaFollowsPigRules(HCatSchema howlTableSchema) throws IOException {
+      for (HCatFieldSchema hfs : howlTableSchema.getFields()){
+          Type htype = hfs.getType();
+          if (htype == Type.ARRAY){
+              validateIsPigCompatibleArrayWithPrimitivesOrSimpleComplexTypes(hfs);
+          }else if (htype == Type.STRUCT){
+              validateIsPigCompatibleStructWithPrimitives(hfs);
+          }else if (htype == Type.MAP){
+              validateIsPigCompatibleMapWithPrimitives(hfs);
+          }else {
+              validateIsPigCompatiblePrimitive(hfs);
+          }
+      }
+  }
+
+  private static void validateIsPigCompatibleArrayWithPrimitivesOrSimpleComplexTypes(
+          HCatFieldSchema hfs) throws IOException {
+      HCatFieldSchema subFieldSchema = hfs.getArrayElementSchema().getFields().get(0);
+      if (subFieldSchema.getType() == Type.STRUCT){
+          validateIsPigCompatibleStructWithPrimitives(subFieldSchema);
+      }else if (subFieldSchema.getType() == Type.MAP) {
+          validateIsPigCompatiblePrimitive(subFieldSchema.getMapValueSchema().getFields().get(0));
+      }else {
+          validateIsPigCompatiblePrimitive(subFieldSchema);
+      }
+  }
+
+  private static void validateIsPigCompatibleMapWithPrimitives(HCatFieldSchema hfs) throws IOException{
+      if (hfs.getMapKeyType() != Type.STRING){
+          throw new PigException("Incompatible type in schema, found map with " +
+                  "non-string key type in :"+hfs.getTypeString(), PIG_EXCEPTION_CODE);
+      }
+      validateIsPigCompatiblePrimitive(hfs.getMapValueSchema().getFields().get(0));
+  }
+
+  private static void validateIsPigCompatibleStructWithPrimitives(HCatFieldSchema hfs) throws IOException {
+      for ( HCatFieldSchema subField : hfs.getStructSubSchema().getFields()){
+          validateIsPigCompatiblePrimitive(subField);
+      }
+  }
+
+  private static void validateIsPigCompatiblePrimitive(HCatFieldSchema hfs) throws IOException {
+      Type htype = hfs.getType();
+      if (
+              (hfs.isComplex()) ||
+              (htype == Type.TINYINT) ||
+              (htype == Type.SMALLINT)
+              ){
+            throw new PigException("Incompatible type in schema, expected pig " +
+                      "compatible primitive for:" + hfs.getTypeString());
+          }
+
+  }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig.drivers;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.pig.PigHCatUtil;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.data.Tuple;
+
+
+/**
+ * This is a base class which wraps a Load func in HowlInputStorageDriver.
+ * If you already have a LoadFunc, then this class along with LoadFuncBasedInputFormat
+ * is doing all the heavy lifting. For a new Howl Input Storage Driver just extend it
+ * and override the initialize(). {@link PigStorageInputDriver} illustrates
+ * that well.
+ */
+public abstract class LoadFuncBasedInputDriver extends HCatInputStorageDriver{
+
+  private LoadFuncBasedInputFormat inputFormat;
+  private HCatSchema dataSchema;
+  private Map<String,String> partVals;
+  private List<String> desiredColNames;
+  protected LoadFunc lf;
+
+  @Override
+  public HCatRecord convertToHCatRecord(WritableComparable baseKey, Writable baseValue)
+      throws IOException {
+
+    List<Object> data = ((Tuple)baseValue).getAll();
+    List<Object> howlRecord = new ArrayList<Object>(desiredColNames.size());
+
+    /* Iterate through columns asked for in output schema, look them up in
+     * original data schema. If found, put it. Else look up in partition columns
+     * if found, put it. Else, its a new column, so need to put null. Map lookup
+     * on partition map will return null, if column is not found.
+     */
+    for(String colName : desiredColNames){
+      Integer idx = dataSchema.getPosition(colName);
+      howlRecord.add( idx != null ? data.get(idx) : partVals.get(colName));
+    }
+    return new DefaultHCatRecord(howlRecord);
+  }
+
+  @Override
+  public InputFormat<? extends WritableComparable, ? extends Writable> getInputFormat(
+      Properties howlProperties) {
+
+    return inputFormat;
+  }
+
+  @Override
+  public void setOriginalSchema(JobContext jobContext, HCatSchema howlSchema) throws IOException {
+
+    dataSchema = howlSchema;
+  }
+
+  @Override
+  public void setOutputSchema(JobContext jobContext, HCatSchema howlSchema) throws IOException {
+
+    desiredColNames = howlSchema.getFieldNames();
+  }
+
+  @Override
+  public void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues)
+      throws IOException {
+
+    partVals = partitionValues;
+  }
+
+  @Override
+  public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
+
+    lf.setLocation(location, new Job(context.getConfiguration()));
+    inputFormat = new LoadFuncBasedInputFormat(lf, PigHCatUtil.getResourceSchema(dataSchema));
+  }
+
+  private String location;
+
+  @Override
+  public void setInputPath(JobContext jobContext, String location) throws IOException {
+
+    this.location = location;
+    super.setInputPath(jobContext, location);
+  }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig.drivers;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.LoadCaster;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+/**
+ * based on {@link PigStorage}
+ */
+public class LoadFuncBasedInputFormat extends InputFormat<BytesWritable,Tuple> {
+
+  private final LoadFunc loadFunc;
+  private static ResourceFieldSchema[] fields;
+
+  public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema) {
+
+    this.loadFunc = loadFunc;
+    fields = dataSchema.getFields();
+  }
+
+  @Override
+  public RecordReader<BytesWritable, Tuple> createRecordReader(
+      InputSplit split, TaskAttemptContext taskContext) throws IOException,
+      InterruptedException {
+    RecordReader<BytesWritable,Tuple> reader = loadFunc.getInputFormat().createRecordReader(split, taskContext);
+    return new LoadFuncBasedRecordReader(reader, loadFunc);
+  }
+
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext) throws IOException,
+  InterruptedException {
+
+    try {
+      InputFormat<BytesWritable,Tuple> inpFormat = loadFunc.getInputFormat();
+      return inpFormat.getSplits(jobContext);
+
+    } catch (InterruptedException    e) {
+      throw new IOException(e);
+    }
+  }
+
+  static class LoadFuncBasedRecordReader extends RecordReader<BytesWritable, Tuple> {
+
+    private Tuple tupleFromDisk;
+    private final RecordReader<BytesWritable,Tuple> reader;
+    private final LoadFunc loadFunc;
+    private final LoadCaster caster;
+
+     /**
+      * @param reader
+      * @param loadFunc
+      * @throws IOException
+      */
+     public LoadFuncBasedRecordReader(RecordReader<BytesWritable,Tuple> reader, LoadFunc loadFunc) throws IOException {
+       this.reader = reader;
+       this.loadFunc = loadFunc;
+       this.caster = loadFunc.getLoadCaster();
+     }
+
+     @Override
+     public void close() throws IOException {
+       reader.close();
+     }
+
+     @Override
+     public BytesWritable getCurrentKey() throws IOException,
+     InterruptedException {
+       return null;
+     }
+
+     @Override
+     public Tuple getCurrentValue() throws IOException, InterruptedException {
+
+       for(int i = 0; i < tupleFromDisk.size(); i++) {
+
+         DataByteArray dba = (DataByteArray) tupleFromDisk.get(i);
+
+         if(dba == null) {
+           // PigStorage will insert nulls for empty fields.
+          tupleFromDisk.set(i, null);
+          continue;
+        }
+
+         switch(fields[i].getType()) {
+
+         case DataType.CHARARRAY:
+           tupleFromDisk.set(i, caster.bytesToCharArray(dba.get()));
+           break;
+
+         case DataType.INTEGER:
+           tupleFromDisk.set(i, caster.bytesToInteger(dba.get()));
+           break;
+
+         case DataType.FLOAT:
+           tupleFromDisk.set(i, caster.bytesToFloat(dba.get()));
+           break;
+
+         case DataType.LONG:
+           tupleFromDisk.set(i, caster.bytesToLong(dba.get()));
+           break;
+
+         case DataType.DOUBLE:
+           tupleFromDisk.set(i, caster.bytesToDouble(dba.get()));
+           break;
+
+         case DataType.MAP:
+           tupleFromDisk.set(i, caster.bytesToMap(dba.get()));
+           break;
+
+         case DataType.BAG:
+           tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i]));
+           break;
+
+         case DataType.TUPLE:
+           tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i]));
+           break;
+
+         default:
+           throw new IOException("Unknown Pig type in data: "+fields[i].getType());
+         }
+       }
+
+       return tupleFromDisk;
+     }
+
+
+     @Override
+     public void initialize(InputSplit split, TaskAttemptContext ctx)
+     throws IOException, InterruptedException {
+
+       reader.initialize(split, ctx);
+       loadFunc.prepareToRead(reader, null);
+     }
+
+     @Override
+     public boolean nextKeyValue() throws IOException, InterruptedException {
+
+       // even if we don't need any data from disk, we will need to call
+       // getNext() on pigStorage() so we know how many rows to emit in our
+       // final output - getNext() will eventually return null when it has
+       // read all disk data and we will know to stop emitting final output
+       tupleFromDisk = loadFunc.getNext();
+       return tupleFromDisk != null;
+     }
+
+     @Override
+     public float getProgress() throws IOException, InterruptedException {
+       return 0;
+     }
+
+  }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java?rev=1091509&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java Tue Apr 12 17:30:08 2011
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig.drivers;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.pig.builtin.PigStorage;
+
+public class PigStorageInputDriver extends LoadFuncBasedInputDriver {
+
+  public static final String delim = "hcat.pigstorage.delim";
+
+  @Override
+  public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
+
+    lf = storageDriverArgs.containsKey(delim) ?
+        new PigStorage(storageDriverArgs.getProperty(delim)) : new PigStorage();
+    super.initialize(context, storageDriverArgs);
+  }
+}



Mime
View raw message