incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From macy...@apache.org
Subject svn commit: r1099539 [1/3] - in /incubator/hcatalog/trunk/src: java/org/apache/hcatalog/mapreduce/ java/org/apache/hcatalog/pig/ test/org/apache/hcatalog/mapreduce/ test/org/apache/hcatalog/pig/
Date Wed, 04 May 2011 17:50:43 GMT
Author: macyang
Date: Wed May  4 17:50:42 2011
New Revision: 1099539

URL: http://svn.apache.org/viewvc?rev=1099539&view=rev
Log:
HCATALOG:16 from https://issues.apache.org/jira/secure/attachment/12478148/exim.mr.pig.patch
(Krishna via Mac)

Added:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximLoader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximLoader.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatEximStorer.java
Modified:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java Wed May  4 17:50:42 2011
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+public abstract class HCatBaseInputFormat extends InputFormat<WritableComparable, HCatRecord> {
+  
+  /**
+   * get the schema for the HowlRecord data returned by HowlInputFormat.
+   * 
+   * @param job
+   *          the job object
+   * @param howlSchema
+   *          the schema to use as the consolidated schema
+   * @throws IllegalArgumentException
+   */
+  public static HCatSchema getOutputSchema(JobContext context) throws Exception {
+    String os = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+    if (os == null) {
+      return getTableSchema(context);
+    } else {
+      return (HCatSchema) HCatUtil.deserialize(os);
+    }
+  }
+  
+  /**
+   * Set the schema for the HowlRecord data returned by HowlInputFormat.
+   * @param job the job object
+   * @param hcatSchema the schema to use as the consolidated schema
+   */
+  public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception {
+    job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema));
+  }
+
+
+  /**
+   * Logically split the set of input files for the job. Returns the
+   * underlying InputFormat's splits
+   * @param jobContext the job context object
+   * @return the splits, an HowlInputSplit wrapper over the storage
+   *         driver InputSplits
+   * @throws IOException or InterruptedException
+   */
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext)
+  throws IOException, InterruptedException {
+
+    //Get the job info from the configuration,
+    //throws exception if not initialized
+    JobInfo jobInfo;
+    try {
+      jobInfo = getJobInfo(jobContext);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    List<PartInfo> partitionInfoList = jobInfo.getPartitions();
+    if(partitionInfoList == null ) {
+      //No partitions match the specified partition filter
+      return splits;
+    }
+
+    //For each matching partition, call getSplits on the underlying InputFormat
+    for(PartInfo partitionInfo : partitionInfoList) {
+      Job localJob = new Job(jobContext.getConfiguration());
+      HCatInputStorageDriver storageDriver;
+      try {
+        storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass());
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+
+      //Pass all required information to the storage driver
+      initStorageDriver(storageDriver, localJob, partitionInfo, jobInfo.getTableSchema());
+
+      //Get the input format for the storage driver
+      InputFormat inputFormat =
+        storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties());
+
+      //Call getSplit on the storage drivers InputFormat, create an
+      //HCatSplit for each underlying split
+      List<InputSplit> baseSplits = inputFormat.getSplits(localJob);
+
+      for(InputSplit split : baseSplits) {
+        splits.add(new HCatSplit(
+            partitionInfo,
+            split,
+            jobInfo.getTableSchema()));
+      }
+    }
+
+    return splits;
+  }
+
+  /**
+   * Create the RecordReader for the given InputSplit. Returns the underlying
+   * RecordReader if the required operations are supported and schema matches
+   * with HowlTable schema. Returns an HowlRecordReader if operations need to
+   * be implemented in Howl.
+   * @param split the split
+   * @param taskContext the task attempt context
+   * @return the record reader instance, either an HowlRecordReader(later) or
+   *         the underlying storage driver's RecordReader
+   * @throws IOException or InterruptedException
+   */
+  @Override
+  public RecordReader<WritableComparable, HCatRecord> createRecordReader(InputSplit split,
+      TaskAttemptContext taskContext) throws IOException, InterruptedException {
+
+    HCatSplit howlSplit = (HCatSplit) split;
+    PartInfo partitionInfo = howlSplit.getPartitionInfo();
+
+    //If running through a Pig job, the JobInfo will not be available in the
+    //backend process context (since HowlLoader works on a copy of the JobContext and does
+    //not call HowlInputFormat.setInput in the backend process).
+    //So this function should NOT attempt to read the JobInfo.
+
+    HCatInputStorageDriver storageDriver;
+    try {
+      storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass());
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+
+    //Pass all required information to the storage driver
+    initStorageDriver(storageDriver, taskContext, partitionInfo, howlSplit.getTableSchema());
+
+    //Get the input format for the storage driver
+    InputFormat inputFormat =
+      storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties());
+
+    //Create the underlying input formats record record and an Howl wrapper
+    RecordReader recordReader =
+      inputFormat.createRecordReader(howlSplit.getBaseSplit(), taskContext);
+
+    return new HCatRecordReader(storageDriver,recordReader);
+  }
+
+  /**
+   * Gets the HowlTable schema for the table specified in the HowlInputFormat.setInput call
+   * on the specified job context. This information is available only after HowlInputFormat.setInput
+   * has been called for a JobContext.
+   * @param context the context
+   * @return the table schema
+   * @throws Exception if HowlInputFromat.setInput has not been called for the current context
+   */
+  public static HCatSchema getTableSchema(JobContext context) throws Exception {
+    JobInfo jobInfo = getJobInfo(context);
+    return jobInfo.getTableSchema();
+  }
+
+  /**
+   * Gets the JobInfo object by reading the Configuration and deserializing
+   * the string. If JobInfo is not present in the configuration, throws an
+   * exception since that means HowlInputFormat.setInput has not been called.
+   * @param jobContext the job context
+   * @return the JobInfo object
+   * @throws Exception the exception
+   */
+  private static JobInfo getJobInfo(JobContext jobContext) throws Exception {
+    String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
+    if( jobString == null ) {
+      throw new Exception("job information not found in JobContext. HowlInputFormat.setInput() not called?");
+    }
+
+    return (JobInfo) HCatUtil.deserialize(jobString);
+  }
+
+
+  /**
+   * Initializes the storage driver instance. Passes on the required
+   * schema information, path info and arguments for the supported
+   * features to the storage driver.
+   * @param storageDriver the storage driver
+   * @param context the job context
+   * @param partitionInfo the partition info
+   * @param tableSchema the table level schema
+   * @throws IOException Signals that an I/O exception has occurred.
+   */
+  private void initStorageDriver(HCatInputStorageDriver storageDriver,
+      JobContext context, PartInfo partitionInfo,
+      HCatSchema tableSchema) throws IOException {
+
+    storageDriver.setInputPath(context, partitionInfo.getLocation());
+
+    if( partitionInfo.getPartitionSchema() != null ) {
+      storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema());
+    }
+
+    storageDriver.setPartitionValues(context, partitionInfo.getPartitionValues());
+
+    //Set the output schema. Use the schema given by user if set, otherwise use the
+    //table level schema
+    HCatSchema outputSchema = null;
+    String outputSchemaString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
+    if( outputSchemaString != null ) {
+      outputSchema = (HCatSchema) HCatUtil.deserialize(outputSchemaString);
+    } else {
+      outputSchema = tableSchema;
+    }
+
+    storageDriver.setOutputSchema(context, outputSchema);
+
+    storageDriver.initialize(context, partitionInfo.getInputStorageDriverProperties());
+  }
+
+  /**
+   * Gets the input driver instance.
+   * @param inputStorageDriverClass the input storage driver classname
+   * @return the input driver instance
+   * @throws Exception
+   */
+  @SuppressWarnings("unchecked")
+  private HCatInputStorageDriver getInputDriverInstance(
+      String inputStorageDriverClass) throws Exception {
+    try {
+      Class<? extends HCatInputStorageDriver> driverClass =
+        (Class<? extends HCatInputStorageDriver>)
+        Class.forName(inputStorageDriverClass);
+      return driverClass.newInstance();
+    } catch(Exception e) {
+      throw new Exception("error creating storage driver " +
+          inputStorageDriverClass, e);
+    }
+  }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputCommitter.java Wed May  4 17:50:42 2011
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus.State;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatException;
+
+public abstract class HCatBaseOutputCommitter extends OutputCommitter {
+
+  /** The underlying output committer */
+  protected final OutputCommitter baseCommitter;
+
+  public HCatBaseOutputCommitter(OutputCommitter baseCommitter) {
+    this.baseCommitter = baseCommitter;
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+      baseCommitter.abortTask(context);
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+      baseCommitter.commitTask(context);
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+      return baseCommitter.needsTaskCommit(context);
+  }
+
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+    if( baseCommitter != null ) {
+      baseCommitter.setupJob(context);
+    }
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+      baseCommitter.setupTask(context);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, State state) throws IOException {
+    if(baseCommitter != null) {
+      baseCommitter.abortJob(jobContext, state);
+    }
+    OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+
+    doAbortJob(jobContext, jobInfo);
+
+    Path src = new Path(jobInfo.getLocation());
+    FileSystem fs = src.getFileSystem(jobContext.getConfiguration());
+    fs.delete(src, true);
+  }
+
+  protected void doAbortJob(JobContext jobContext, OutputJobInfo jobInfo) throws HCatException {
+  }
+
+  public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
+  static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
+    "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+
+  private static boolean getOutputDirMarking(Configuration conf) {
+    return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
+                           false);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    if(baseCommitter != null) {
+      baseCommitter.commitJob(jobContext);
+    }
+    // create _SUCCESS FILE if so requested.
+    OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+    if(getOutputDirMarking(jobContext.getConfiguration())) {
+      Path outputPath = new Path(jobInfo.getLocation());
+      if (outputPath != null) {
+        FileSystem fileSys = outputPath.getFileSystem(jobContext.getConfiguration());
+        // create a file in the folder to mark it
+        if (fileSys.exists(outputPath)) {
+          Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
+          if(!fileSys.exists(filePath)) { // may have been created by baseCommitter.commitJob()
+            fileSys.create(filePath).close();
+          }
+        }
+      }
+    }
+    cleanupJob(jobContext);
+  }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatBaseOutputFormat.java Wed May  4 17:50:42 2011
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+
+public abstract class HCatBaseOutputFormat extends OutputFormat<WritableComparable<?>, HCatRecord> {
+
+  /**
+   * Gets the table schema for the table specified in the HowlOutputFormat.setOutput call
+   * on the specified job context.
+   * @param context the context
+   * @return the table schema
+   * @throws IOException if HowlOutputFromat.setOutput has not been called for the passed context
+   */
+  public static HCatSchema getTableSchema(JobContext context) throws IOException {
+      OutputJobInfo jobInfo = getJobInfo(context);
+      return jobInfo.getTableSchema();
+  }
+
+  /**
+   * Check for validity of the output-specification for the job.
+   * @param context information about the job
+   * @throws IOException when output should not be attempted
+   */
+  @Override
+  public void checkOutputSpecs(JobContext context
+                                        ) throws IOException, InterruptedException {
+      OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
+      outputFormat.checkOutputSpecs(context);
+  }
+
+  /**
+   * Gets the output format instance.
+   * @param context the job context
+   * @return the output format instance
+   * @throws IOException
+   */
+  protected OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat(JobContext context) throws IOException {
+      OutputJobInfo jobInfo = getJobInfo(context);
+      HCatOutputStorageDriver  driver = getOutputDriverInstance(context, jobInfo);
+
+      OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat =
+            driver.getOutputFormat();
+      return outputFormat;
+  }
+
+  /**
+   * Gets the HowlOuputJobInfo object by reading the Configuration and deserializing
+   * the string. If JobInfo is not present in the configuration, throws an
+   * exception since that means HowlOutputFormat.setOutput has not been called.
+   * @param jobContext the job context
+   * @return the OutputJobInfo object
+   * @throws IOException the IO exception
+   */
+  static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException {
+      String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+      if( jobString == null ) {
+          throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED);
+      }
+
+      return (OutputJobInfo) HCatUtil.deserialize(jobString);
+  }
+
+  /**
+   * Gets the output storage driver instance.
+   * @param jobContext the job context
+   * @param jobInfo the output job info
+   * @return the output driver instance
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  static HCatOutputStorageDriver getOutputDriverInstance(
+          JobContext jobContext, OutputJobInfo jobInfo) throws IOException {
+      try {
+          Class<? extends HCatOutputStorageDriver> driverClass =
+              (Class<? extends HCatOutputStorageDriver>)
+              Class.forName(jobInfo.getStorerInfo().getOutputSDClass());
+          HCatOutputStorageDriver driver = driverClass.newInstance();
+
+          //Initialize the storage driver
+          driver.setSchema(jobContext, jobInfo.getOutputSchema());
+          driver.setPartitionValues(jobContext, jobInfo.getTableInfo().getPartitionValues());
+          driver.setOutputPath(jobContext, jobInfo.getLocation());
+
+          driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties());
+
+          return driver;
+      } catch(Exception e) {
+          throw new HCatException(ErrorType.ERROR_INIT_STORAGE_DRIVER, e);
+      }
+  }
+
+  protected static void setPartDetails(OutputJobInfo jobInfo, final HCatSchema schema,
+      Map<String, String> partMap) throws HCatException, IOException {
+    List<Integer> posOfPartCols = new ArrayList<Integer>();
+
+    // If partition columns occur in data, we want to remove them.
+    // So, find out positions of partition columns in schema provided by user.
+    // We also need to update the output Schema with these deletions.
+
+    // Note that, output storage drivers never sees partition columns in data
+    // or schema.
+
+    HCatSchema schemaWithoutParts = new HCatSchema(schema.getFields());
+    for(String partKey : partMap.keySet()){
+      Integer idx;
+      if((idx = schema.getPosition(partKey)) != null){
+        posOfPartCols.add(idx);
+        schemaWithoutParts.remove(schema.get(partKey));
+      }
+    }
+    HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts);
+    jobInfo.setPosOfPartCols(posOfPartCols);
+    jobInfo.setOutputSchema(schemaWithoutParts);
+  }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximInputFormat.java Wed May  4 17:50:42 2011
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+
+/** The InputFormat to use to read data from Howl */
+public class HCatEximInputFormat extends HCatBaseInputFormat {
+
+  /**
+   * Set the input to use for the Job. This queries the metadata file with
+   * the specified partition predicates, gets the matching partitions, puts
+   * the information in the conf object. The inputInfo object is updated with
+   * information needed in the client context
+   *
+   * @param job
+   *          the job object
+   * @param inputInfo
+   *          the table input info
+   * @return two howl schemas, for the table columns and the partition keys
+   * @throws IOException
+   *           the exception in communicating with the metadata server
+   */
+  public static List<HCatSchema> setInput(Job job,
+      String location,
+      Map<String, String> partitionFilter) throws IOException {
+    FileSystem fs;
+    try {
+      fs = FileSystem.get(new URI(location), job.getConfiguration());
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+    Path fromPath = new Path(location);
+    Path metadataPath = new Path(fromPath, "_metadata");
+    try {
+      Map.Entry<org.apache.hadoop.hive.metastore.api.Table, List<Partition>> tp = EximUtil
+      .readMetaData(fs, metadataPath);
+      org.apache.hadoop.hive.metastore.api.Table table = tp.getKey();
+      HCatTableInfo inputInfo = HCatTableInfo.getInputTableInfo(null,
+          null, table.getDbName(), table.getTableName());
+      List<FieldSchema> partCols = table.getPartitionKeys();
+      List<PartInfo> partInfoList = null;
+      if (partCols.size() > 0) {
+        List<String> partColNames = new ArrayList<String>(partCols.size());
+        for (FieldSchema fsc : partCols) {
+          partColNames.add(fsc.getName());
+        }
+        List<Partition> partitions = tp.getValue();
+        partInfoList = filterPartitions(partitionFilter, partitions, table.getPartitionKeys());
+      } else {
+        partInfoList = new ArrayList<PartInfo>(1);
+        HCatSchema schema = new HCatSchema(HCatUtil.getHCatFieldSchemaList(table.getSd().getCols()));
+        Map<String,String> parameters = table.getParameters();
+        String inputStorageDriverClass = null;
+        if (parameters.containsKey(HCatConstants.HCAT_ISD_CLASS)){
+          inputStorageDriverClass = parameters.get(HCatConstants.HCAT_ISD_CLASS);
+        }else{
+          throw new IOException("No input storage driver classname found, cannot read partition");
+        }
+        Properties howlProperties = new Properties();
+        for (String key : parameters.keySet()){
+          if (key.startsWith(InitializeInput.HCAT_KEY_PREFIX)){
+            howlProperties.put(key, parameters.get(key));
+          }
+        }
+        PartInfo partInfo = new PartInfo(schema, inputStorageDriverClass,  location + "/data", howlProperties);
+        partInfoList.add(partInfo);
+      }
+      JobInfo howlJobInfo = new JobInfo(inputInfo,
+          HCatUtil.getTableSchemaWithPtnCols(table), partInfoList);
+      job.getConfiguration().set(
+          HCatConstants.HCAT_KEY_JOB_INFO,
+          HCatUtil.serialize(howlJobInfo));
+      List<HCatSchema> rv = new ArrayList<HCatSchema>(2);
+      rv.add(HCatSchemaUtils.getHCatSchema(table.getSd().getCols()));
+      rv.add(HCatSchemaUtils.getHCatSchema(partCols));
+      return rv;
+    } catch(SemanticException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private static List<PartInfo> filterPartitions(Map<String, String> partitionFilter,
+      List<Partition> partitions, List<FieldSchema> partCols) throws IOException {
+    List<PartInfo> partInfos = new LinkedList<PartInfo>();
+    for (Partition partition : partitions) {
+      boolean matches = true;
+      List<String> partVals = partition.getValues();
+      assert partCols.size() == partVals.size();
+      Map<String, String> partSpec = EximUtil.makePartSpec(partCols, partVals);
+      if (partitionFilter != null) {
+        for (Map.Entry<String, String> constraint : partitionFilter.entrySet()) {
+          String partVal = partSpec.get(constraint.getKey());
+          if ((partVal == null) || !partVal.equals(constraint.getValue())) {
+            matches = false;
+            break;
+          }
+        }
+      }
+      if (matches) {
+        PartInfo partInfo = InitializeInput.extractPartInfo(partition.getSd(),
+            partition.getParameters());
+        partInfo.setPartitionValues(partSpec);
+        partInfos.add(partInfo);
+      }
+    }
+    return partInfos;
+  }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputCommitter.java Wed May  4 17:50:42 2011
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatException;
+
+public class HCatEximOutputCommitter extends HCatBaseOutputCommitter {
+
+  private static final Log LOG = LogFactory.getLog(HCatEximOutputCommitter.class);
+
+  public HCatEximOutputCommitter(OutputCommitter baseCommitter) {
+    super(baseCommitter);
+  }
+
+  @Override
+  public void cleanupJob(JobContext jobContext) throws IOException {
+    LOG.info("HowlEximOutputCommitter.cleanup invoked; m.o.d : " +
+        jobContext.getConfiguration().get("mapred.output.dir"));
+    if (baseCommitter != null) {
+      LOG.info("baseCommitter.class = " + baseCommitter.getClass().getName());
+      baseCommitter.cleanupJob(jobContext);
+    }
+
+    OutputJobInfo jobInfo = HCatBaseOutputFormat.getJobInfo(jobContext);
+    Configuration conf = jobContext.getConfiguration();
+    FileSystem fs;
+    try {
+      fs = FileSystem.get(new URI(jobInfo.getTable().getSd().getLocation()), conf);
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+    doCleanup(jobInfo, fs);
+  }
+
+  private static void doCleanup(OutputJobInfo jobInfo, FileSystem fs) throws IOException,
+      HCatException {
+    try {
+      Table ttable = jobInfo.getTable();
+      org.apache.hadoop.hive.ql.metadata.Table table = new org.apache.hadoop.hive.ql.metadata.Table(
+          ttable);
+      StorageDescriptor tblSD = ttable.getSd();
+      Path tblPath = new Path(tblSD.getLocation());
+      Path path = new Path(tblPath, "_metadata");
+      List<Partition> tpartitions = null;
+      try {
+        Map.Entry<org.apache.hadoop.hive.metastore.api.Table, List<Partition>> rv = EximUtil
+            .readMetaData(fs, path);
+        tpartitions = rv.getValue();
+      } catch (IOException e) {
+      }
+      List<org.apache.hadoop.hive.ql.metadata.Partition> partitions =
+        new ArrayList<org.apache.hadoop.hive.ql.metadata.Partition>();
+      if (tpartitions != null) {
+        for (Partition tpartition : tpartitions) {
+          partitions.add(new org.apache.hadoop.hive.ql.metadata.Partition(table, tpartition));
+        }
+      }
+      if (!table.getPartitionKeys().isEmpty()) {
+        Map<String, String> partitionValues = jobInfo.getTableInfo().getPartitionValues();
+        org.apache.hadoop.hive.ql.metadata.Partition partition =
+            new org.apache.hadoop.hive.ql.metadata.Partition(table,
+                partitionValues,
+                new Path(tblPath, Warehouse.makePartPath(partitionValues)));
+        partition.getTPartition().setParameters(table.getParameters());
+        partitions.add(partition);
+      }
+      EximUtil.createExportDump(fs, path, (table), partitions);
+    } catch (SemanticException e) {
+      throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+    } catch (HiveException e) {
+      throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+    } catch (MetaException e) {
+      throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, e);
+    }
+  }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatEximOutputFormat.java Wed May  4 17:50:42 2011
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.mapreduce;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.ErrorType;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.rcfile.RCFileInputDriver;
+import org.apache.hcatalog.rcfile.RCFileOutputDriver;
+
+/**
+ * The OutputFormat to use to write data to Howl without a howl server. This can then
+ * be imported into a howl instance, or used with a HowlEximInputFormat. As in
+ * HowlOutputFormat, the key value is ignored and
+ * and should be given as null. The value is the HowlRecord to write.
+ */
+public class HCatEximOutputFormat extends HCatBaseOutputFormat {
+
+  private static final Log LOG = LogFactory.getLog(HCatEximOutputFormat.class);
+
+  /**
+   * Get the record writer for the job. Uses the Table's default OutputStorageDriver
+   * to get the record writer.
+   *
+   * @param context
+   *          the information about the current task.
+   * @return a RecordWriter to write the output for the job.
+   * @throws IOException
+   */
+  @Override
+  public RecordWriter<WritableComparable<?>, HCatRecord>
+      getRecordWriter(TaskAttemptContext context
+                      ) throws IOException, InterruptedException {
+    HCatRecordWriter rw = new HCatRecordWriter(context);
+    return rw;
+  }
+
+  /**
+   * Get the output committer for this output format. This is responsible
+   * for ensuring the output is committed correctly.
+   * @param context the task context
+   * @return an output committer
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
+      OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
+      return new HCatEximOutputCommitter(outputFormat.getOutputCommitter(context));
+  }
+
+  public static void setOutput(Job job, String dbname, String tablename, String location,
+      HCatSchema partitionSchema, List<String> partitionValues, HCatSchema columnSchema) throws HCatException {
+    setOutput(job, dbname, tablename, location, partitionSchema, partitionValues, columnSchema,
+          RCFileInputDriver.class.getName(),
+          RCFileOutputDriver.class.getName(),
+          RCFileInputFormat.class.getName(),
+          RCFileOutputFormat.class.getName(),
+          ColumnarSerDe.class.getName());
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void setOutput(Job job, String dbname, String tablename, String location,
+          HCatSchema partitionSchema,
+          List<String> partitionValues,
+          HCatSchema columnSchema,
+          String isdname, String osdname,
+          String ifname, String ofname,
+          String serializationLib) throws HCatException {
+    Map<String, String> partSpec = new TreeMap<String, String>();
+    List<HCatFieldSchema> partKeys = null;
+    if (partitionSchema != null) {
+      partKeys = partitionSchema.getFields();
+      if (partKeys.size() != partitionValues.size()) {
+        throw new IllegalArgumentException("Partition key size differs from partition value size");
+      }
+      for (int i = 0; i < partKeys.size(); ++i) {
+        HCatFieldSchema partKey = partKeys.get(i);
+        if (partKey.getType() != HCatFieldSchema.Type.STRING) {
+          throw new IllegalArgumentException("Partition key type string is only supported");
+        }
+        partSpec.put(partKey.getName(), partitionValues.get(i));
+      }
+    }
+    StorerInfo storerInfo = new StorerInfo(isdname, osdname, new Properties());
+    HCatTableInfo outputInfo = HCatTableInfo.getOutputTableInfo(null, null, dbname, tablename,
+        partSpec);
+    org.apache.hadoop.hive.ql.metadata.Table tbl = new
+      org.apache.hadoop.hive.ql.metadata.Table(dbname, tablename);
+    Table table = tbl.getTTable();
+    table.getParameters().put(HCatConstants.HCAT_ISD_CLASS, isdname);
+    table.getParameters().put(HCatConstants.HCAT_OSD_CLASS, osdname);
+    try {
+      String partname = null;
+      if ((partKeys != null) && !partKeys.isEmpty()) {
+        table.setPartitionKeys(HCatSchemaUtils.getFieldSchemas(partKeys));
+        partname = Warehouse.makePartPath(partSpec);
+      } else {
+        partname = "data";
+      }
+      StorageDescriptor sd = table.getSd();
+      sd.setLocation(location);
+      String dataLocation = location + "/" + partname;
+      OutputJobInfo jobInfo = new OutputJobInfo(outputInfo,
+          columnSchema, columnSchema, storerInfo, dataLocation, table);
+      setPartDetails(jobInfo, columnSchema, partSpec);
+      sd.setCols(HCatUtil.getFieldSchemaList(jobInfo.getOutputSchema().getFields()));
+      sd.setInputFormat(ifname);
+      sd.setOutputFormat(ofname);
+      SerDeInfo serdeInfo = sd.getSerdeInfo();
+      serdeInfo.setSerializationLib(serializationLib);
+      Configuration conf = job.getConfiguration();
+      conf.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
+    } catch (IOException e) {
+      throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e);
+    } catch (MetaException e) {
+      throw new HCatException(ErrorType.ERROR_SET_OUTPUT, e);
+    }
+  }
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java?rev=1099539&r1=1099538&r2=1099539&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatInputFormat.java Wed May  4 17:50:42 2011
@@ -19,23 +19,11 @@
 package org.apache.hcatalog.mapreduce;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatSchema;
 
 /** The InputFormat to use to read data from Howl */
-public class HCatInputFormat extends InputFormat<WritableComparable, HCatRecord> {
+public class HCatInputFormat extends HCatBaseInputFormat {
 
   /**
    * Set the input to use for the Job. This queries the metadata server with
@@ -55,206 +43,5 @@ public class HCatInputFormat extends Inp
     }
   }
 
-  /**
-   * Set the schema for the HowlRecord data returned by HowlInputFormat.
-   * @param job the job object
-   * @param hcatSchema the schema to use as the consolidated schema
-   */
-  public static void setOutputSchema(Job job,HCatSchema hcatSchema) throws Exception {
-    job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA, HCatUtil.serialize(hcatSchema));
-  }
-
-
-  /**
-   * Logically split the set of input files for the job. Returns the
-   * underlying InputFormat's splits
-   * @param jobContext the job context object
-   * @return the splits, an HowlInputSplit wrapper over the storage
-   *         driver InputSplits
-   * @throws IOException or InterruptedException
-   */
-  @Override
-  public List<InputSplit> getSplits(JobContext jobContext)
-  throws IOException, InterruptedException {
-
-    //Get the job info from the configuration,
-    //throws exception if not initialized
-    JobInfo jobInfo;
-    try {
-      jobInfo = getJobInfo(jobContext);
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    List<InputSplit> splits = new ArrayList<InputSplit>();
-    List<PartInfo> partitionInfoList = jobInfo.getPartitions();
-    if(partitionInfoList == null ) {
-      //No partitions match the specified partition filter
-      return splits;
-    }
-
-    //For each matching partition, call getSplits on the underlying InputFormat
-    for(PartInfo partitionInfo : partitionInfoList) {
-      Job localJob = new Job(jobContext.getConfiguration());
-      HCatInputStorageDriver storageDriver;
-      try {
-        storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass());
-      } catch (Exception e) {
-        throw new IOException(e);
-      }
-
-      //Pass all required information to the storage driver
-      initStorageDriver(storageDriver, localJob, partitionInfo, jobInfo.getTableSchema());
-
-      //Get the input format for the storage driver
-      InputFormat inputFormat =
-        storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties());
-
-      //Call getSplit on the storage drivers InputFormat, create an
-      //HCatSplit for each underlying split
-      List<InputSplit> baseSplits = inputFormat.getSplits(localJob);
-
-      for(InputSplit split : baseSplits) {
-        splits.add(new HCatSplit(
-            partitionInfo,
-            split,
-            jobInfo.getTableSchema()));
-      }
-    }
-
-    return splits;
-  }
-
-  /**
-   * Create the RecordReader for the given InputSplit. Returns the underlying
-   * RecordReader if the required operations are supported and schema matches
-   * with HowlTable schema. Returns an HowlRecordReader if operations need to
-   * be implemented in Howl.
-   * @param split the split
-   * @param taskContext the task attempt context
-   * @return the record reader instance, either an HowlRecordReader(later) or
-   *         the underlying storage driver's RecordReader
-   * @throws IOException or InterruptedException
-   */
-  @Override
-  public RecordReader<WritableComparable, HCatRecord> createRecordReader(InputSplit split,
-      TaskAttemptContext taskContext) throws IOException, InterruptedException {
-
-    HCatSplit howlSplit = (HCatSplit) split;
-    PartInfo partitionInfo = howlSplit.getPartitionInfo();
-
-    //If running through a Pig job, the JobInfo will not be available in the
-    //backend process context (since HowlLoader works on a copy of the JobContext and does
-    //not call HowlInputFormat.setInput in the backend process).
-    //So this function should NOT attempt to read the JobInfo.
-
-    HCatInputStorageDriver storageDriver;
-    try {
-      storageDriver = getInputDriverInstance(partitionInfo.getInputStorageDriverClass());
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-
-    //Pass all required information to the storage driver
-    initStorageDriver(storageDriver, taskContext, partitionInfo, howlSplit.getTableSchema());
-
-    //Get the input format for the storage driver
-    InputFormat inputFormat =
-      storageDriver.getInputFormat(partitionInfo.getInputStorageDriverProperties());
-
-    //Create the underlying input formats record record and an Howl wrapper
-    RecordReader recordReader =
-      inputFormat.createRecordReader(howlSplit.getBaseSplit(), taskContext);
-
-    return new HCatRecordReader(storageDriver,recordReader);
-  }
-
-  /**
-   * Gets the HowlTable schema for the table specified in the HowlInputFormat.setInput call
-   * on the specified job context. This information is available only after HowlInputFormat.setInput
-   * has been called for a JobContext.
-   * @param context the context
-   * @return the table schema
-   * @throws Exception if HowlInputFromat.setInput has not been called for the current context
-   */
-  public static HCatSchema getTableSchema(JobContext context) throws Exception {
-    JobInfo jobInfo = getJobInfo(context);
-    return jobInfo.getTableSchema();
-  }
-
-  /**
-   * Gets the JobInfo object by reading the Configuration and deserializing
-   * the string. If JobInfo is not present in the configuration, throws an
-   * exception since that means HowlInputFormat.setInput has not been called.
-   * @param jobContext the job context
-   * @return the JobInfo object
-   * @throws Exception the exception
-   */
-  private static JobInfo getJobInfo(JobContext jobContext) throws Exception {
-    String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO);
-    if( jobString == null ) {
-      throw new Exception("job information not found in JobContext. HowlInputFormat.setInput() not called?");
-    }
-
-    return (JobInfo) HCatUtil.deserialize(jobString);
-  }
-
-
-  /**
-   * Initializes the storage driver instance. Passes on the required
-   * schema information, path info and arguments for the supported
-   * features to the storage driver.
-   * @param storageDriver the storage driver
-   * @param context the job context
-   * @param partitionInfo the partition info
-   * @param tableSchema the table level schema
-   * @throws IOException Signals that an I/O exception has occurred.
-   */
-  private void initStorageDriver(HCatInputStorageDriver storageDriver,
-      JobContext context, PartInfo partitionInfo,
-      HCatSchema tableSchema) throws IOException {
-
-    storageDriver.setInputPath(context, partitionInfo.getLocation());
-
-    if( partitionInfo.getPartitionSchema() != null ) {
-      storageDriver.setOriginalSchema(context, partitionInfo.getPartitionSchema());
-    }
-
-    storageDriver.setPartitionValues(context, partitionInfo.getPartitionValues());
-
-    //Set the output schema. Use the schema given by user if set, otherwise use the
-    //table level schema
-    HCatSchema outputSchema = null;
-    String outputSchemaString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA);
-    if( outputSchemaString != null ) {
-      outputSchema = (HCatSchema) HCatUtil.deserialize(outputSchemaString);
-    } else {
-      outputSchema = tableSchema;
-    }
-
-    storageDriver.setOutputSchema(context, outputSchema);
-
-    storageDriver.initialize(context, partitionInfo.getInputStorageDriverProperties());
-  }
-
-  /**
-   * Gets the input driver instance.
-   * @param inputStorageDriverClass the input storage driver classname
-   * @return the input driver instance
-   * @throws Exception
-   */
-  @SuppressWarnings("unchecked")
-  private HCatInputStorageDriver getInputDriverInstance(
-      String inputStorageDriverClass) throws Exception {
-    try {
-      Class<? extends HCatInputStorageDriver> driverClass =
-        (Class<? extends HCatInputStorageDriver>)
-        Class.forName(inputStorageDriverClass);
-      return driverClass.newInstance();
-    } catch(Exception e) {
-      throw new Exception("error creating storage driver " +
-          inputStorageDriverClass, e);
-    }
-  }
 
 }

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java?rev=1099539&r1=1099538&r2=1099539&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatOutputFormat.java Wed May  4 17:50:42 2011
@@ -64,7 +64,7 @@ import org.apache.thrift.TException;
 
 /** The OutputFormat to use to write data to Howl. The key value is ignored and
  * and should be given as null. The value is the HowlRecord to write.*/
-public class HCatOutputFormat extends OutputFormat<WritableComparable<?>, HCatRecord> {
+public class HCatOutputFormat extends HCatBaseOutputFormat {
 
     /** The directory under which data is initially written for a non partitioned table */
     protected static final String TEMP_DIR_NAME = "_TEMP";
@@ -283,42 +283,11 @@ public class HCatOutputFormat extends Ou
 
         OutputJobInfo jobInfo = getJobInfo(job);
         Map<String,String> partMap = jobInfo.getTableInfo().getPartitionValues();
-        List<Integer> posOfPartCols = new ArrayList<Integer>();
-
-        // If partition columns occur in data, we want to remove them.
-        // So, find out positions of partition columns in schema provided by user.
-        // We also need to update the output Schema with these deletions.
-
-        // Note that, output storage drivers never sees partition columns in data
-        // or schema.
-
-        HCatSchema schemaWithoutParts = new HCatSchema(schema.getFields());
-        for(String partKey : partMap.keySet()){
-          Integer idx;
-          if((idx = schema.getPosition(partKey)) != null){
-            posOfPartCols.add(idx);
-            schemaWithoutParts.remove(schema.get(partKey));
-          }
-        }
-        HCatUtil.validatePartitionSchema(jobInfo.getTable(), schemaWithoutParts);
-        jobInfo.setPosOfPartCols(posOfPartCols);
-        jobInfo.setOutputSchema(schemaWithoutParts);
+        setPartDetails(jobInfo, schema, partMap);
         job.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(jobInfo));
     }
 
     /**
-     * Gets the table schema for the table specified in the HowlOutputFormat.setOutput call
-     * on the specified job context.
-     * @param context the context
-     * @return the table schema
-     * @throws IOException if HowlOutputFromat.setOutput has not been called for the passed context
-     */
-    public static HCatSchema getTableSchema(JobContext context) throws IOException {
-        OutputJobInfo jobInfo = getJobInfo(context);
-        return jobInfo.getTableSchema();
-    }
-
-    /**
      * Get the record writer for the job. Uses the Table's default OutputStorageDriver
      * to get the record writer.
      * @param context the information about the current task.
@@ -349,18 +318,6 @@ public class HCatOutputFormat extends Ou
     }
 
     /**
-     * Check for validity of the output-specification for the job.
-     * @param context information about the job
-     * @throws IOException when output should not be attempted
-     */
-    @Override
-    public void checkOutputSpecs(JobContext context
-                                          ) throws IOException, InterruptedException {
-        OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat = getOutputFormat(context);
-        outputFormat.checkOutputSpecs(context);
-    }
-
-    /**
      * Get the output committer for this output format. This is responsible
      * for ensuring the output is committed correctly.
      * @param context the task context
@@ -375,68 +332,6 @@ public class HCatOutputFormat extends Ou
         return new HCatOutputCommitter(outputFormat.getOutputCommitter(context));
     }
 
-
-    /**
-     * Gets the output format instance.
-     * @param context the job context
-     * @return the output format instance
-     * @throws IOException
-     */
-    private OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat(JobContext context) throws IOException {
-        OutputJobInfo jobInfo = getJobInfo(context);
-        HCatOutputStorageDriver  driver = getOutputDriverInstance(context, jobInfo);
-
-        OutputFormat<? super WritableComparable<?>, ? super Writable> outputFormat =
-              driver.getOutputFormat();
-        return outputFormat;
-    }
-
-    /**
-     * Gets the HowlOuputJobInfo object by reading the Configuration and deserializing
-     * the string. If JobInfo is not present in the configuration, throws an
-     * exception since that means HowlOutputFormat.setOutput has not been called.
-     * @param jobContext the job context
-     * @return the OutputJobInfo object
-     * @throws IOException the IO exception
-     */
-    static OutputJobInfo getJobInfo(JobContext jobContext) throws IOException {
-        String jobString = jobContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
-        if( jobString == null ) {
-            throw new HCatException(ErrorType.ERROR_NOT_INITIALIZED);
-        }
-
-        return (OutputJobInfo) HCatUtil.deserialize(jobString);
-    }
-
-    /**
-     * Gets the output storage driver instance.
-     * @param jobContext the job context
-     * @param jobInfo the output job info
-     * @return the output driver instance
-     * @throws IOException
-     */
-    @SuppressWarnings("unchecked")
-    static HCatOutputStorageDriver getOutputDriverInstance(
-            JobContext jobContext, OutputJobInfo jobInfo) throws IOException {
-        try {
-            Class<? extends HCatOutputStorageDriver> driverClass =
-                (Class<? extends HCatOutputStorageDriver>)
-                Class.forName(jobInfo.getStorerInfo().getOutputSDClass());
-            HCatOutputStorageDriver driver = driverClass.newInstance();
-
-            //Initialize the storage driver
-            driver.setSchema(jobContext, jobInfo.getOutputSchema());
-            driver.setPartitionValues(jobContext, jobInfo.getTableInfo().getPartitionValues());
-            driver.setOutputPath(jobContext, jobInfo.getLocation());
-
-            driver.initialize(jobContext, jobInfo.getStorerInfo().getProperties());
-
-            return driver;
-        } catch(Exception e) {
-            throw new HCatException(ErrorType.ERROR_INIT_STORAGE_DRIVER, e);
-        }
-    }
-
     static HiveMetaStoreClient createHiveClient(String url, Configuration conf) throws IOException, MetaException {
       HiveConf hiveConf = new HiveConf(HCatOutputFormat.class);
 

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1099539&r1=1099538&r2=1099539&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Wed May  4 17:50:42 2011
@@ -47,7 +47,7 @@ import org.apache.hcatalog.data.schema.H
 public class InitializeInput {
 
   /** The prefix for keys used for storage driver arguments */
-  private static final String HCAT_KEY_PREFIX = "hcat.";
+  static final String HCAT_KEY_PREFIX = "hcat.";
   private static final HiveConf hiveConf = new HiveConf(HCatInputFormat.class);
 
   private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, HCatTableInfo inputInfo) throws Exception {
@@ -146,7 +146,7 @@ public class InitializeInput {
     return ptnKeyValues;
   }
 
-  private static PartInfo extractPartInfo(StorageDescriptor sd, Map<String,String> parameters) throws IOException{
+  static PartInfo extractPartInfo(StorageDescriptor sd, Map<String,String> parameters) throws IOException{
     HCatSchema schema = HCatUtil.extractSchemaFromStorageDescriptor(sd);
     String inputStorageDriverClass = null;
     Properties howlProperties = new Properties();

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java Wed May  4 17:50:42 2011
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.PigException;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Base class for HCatLoader and HCatEximLoader
+ */
+
+public abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
+
+  protected static final String PRUNE_PROJECTION_INFO = "prune.projection.info";
+
+  private RecordReader<?, ?> reader;
+  protected String signature;
+
+  HCatSchema outputSchema = null;
+
+
+  @Override
+  public Tuple getNext() throws IOException {
+    try {
+      HCatRecord hr =  (HCatRecord) (reader.nextKeyValue() ? reader.getCurrentValue() : null);
+      Tuple t = PigHCatUtil.transformToTuple(hr,outputSchema);
+      // TODO : we were discussing an iter interface, and also a LazyTuple
+      // change this when plans for that solidifies.
+      return t;
+    } catch (ExecException e) {
+      int errCode = 6018;
+      String errMsg = "Error while reading input";
+      throw new ExecException(errMsg, errCode,
+          PigException.REMOTE_ENVIRONMENT, e);
+    } catch (Exception eOther){
+      int errCode = 6018;
+      String errMsg = "Error converting read value to tuple";
+      throw new ExecException(errMsg, errCode,
+          PigException.REMOTE_ENVIRONMENT, eOther);
+    }
+
+  }
+
+  @Override
+  public void prepareToRead(RecordReader reader, PigSplit arg1) throws IOException {
+    this.reader = reader;
+  }
+
+  @Override
+  public ResourceStatistics getStatistics(String location, Job job) throws IOException {
+    // statistics not implemented currently
+    return null;
+  }
+
+  @Override
+  public List<OperatorSet> getFeatures() {
+    return Arrays.asList(LoadPushDown.OperatorSet.PROJECTION);
+  }
+
+  @Override
+  public RequiredFieldResponse pushProjection(RequiredFieldList requiredFieldsInfo) throws FrontendException {
+    // Store the required fields information in the UDFContext so that we
+    // can retrieve it later.
+    storeInUDFContext(signature, PRUNE_PROJECTION_INFO, requiredFieldsInfo);
+
+    // Howl will always prune columns based on what we ask of it - so the
+    // response is true
+    return new RequiredFieldResponse(true);
+  }
+
+  @Override
+  public void setUDFContextSignature(String signature) {
+    this.signature = signature;
+  }
+
+
+  // helper methods
+  protected void storeInUDFContext(String signature, String key, Object value) {
+    UDFContext udfContext = UDFContext.getUDFContext();
+    Properties props = udfContext.getUDFProperties(
+        this.getClass(), new String[] {signature});
+    props.put(key, value);
+  }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseStorer.java Wed May  4 17:50:42 2011
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema.Type;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.StoreMetadata;
+import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * Base class for HCatStorer and HCatEximStorer
+ *
+ */
+
+public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata {
+
+  /**
+   *
+   */
+  protected static final String COMPUTED_OUTPUT_SCHEMA = "howl.output.schema";
+  protected final Map<String,String> partitions;
+  protected Schema pigSchema;
+  private RecordWriter<WritableComparable<?>, HCatRecord> writer;
+  protected HCatSchema computedSchema;
+  protected static final String PIG_SCHEMA = "howl.pig.store.schema";
+  protected String sign;
+
+  public HCatBaseStorer(String partSpecs, String schema) throws ParseException, FrontendException {
+
+    partitions = new HashMap<String, String>();
+    if(partSpecs != null && !partSpecs.trim().isEmpty()){
+      String[] partKVPs = partSpecs.split(",");
+      for(String partKVP : partKVPs){
+        String[] partKV = partKVP.split("=");
+        if(partKV.length == 2) {
+          partitions.put(partKV[0].trim(), partKV[1].trim());
+        } else {
+          throw new FrontendException("Invalid partition column specification. "+partSpecs, PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+      }
+    }
+
+    if(schema != null) {
+      pigSchema = Utils.getSchemaFromString(schema);
+    }
+
+  }
+
+  @Override
+  public void checkSchema(ResourceSchema resourceSchema) throws IOException {
+
+    /*  Schema provided by user and the schema computed by Pig
+     * at the time of calling store must match.
+     */
+    Schema runtimeSchema = Schema.getPigSchema(resourceSchema);
+    if(pigSchema != null){
+      if(! Schema.equals(runtimeSchema, pigSchema, false, true) ){
+        throw new FrontendException("Schema provided in store statement doesn't match with the Schema" +
+            "returned by Pig run-time. Schema provided in HowlStorer: "+pigSchema.toString()+ " Schema received from Pig runtime: "+runtimeSchema.toString(), PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+    } else {
+      pigSchema = runtimeSchema;
+    }
+    UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).setProperty(PIG_SCHEMA,ObjectSerializer.serialize(pigSchema));
+  }
+
+  /** Constructs HCatSchema from pigSchema. Passed tableSchema is the existing
+   * schema of the table in metastore.
+   */
+  protected HCatSchema convertPigSchemaToHCatSchema(Schema pigSchema, HCatSchema tableSchema) throws FrontendException{
+
+    List<HCatFieldSchema> fieldSchemas = new ArrayList<HCatFieldSchema>(pigSchema.size());
+    for(FieldSchema fSchema : pigSchema.getFields()){
+      byte type = fSchema.type;
+      HCatFieldSchema howlFSchema;
+
+      try {
+
+        // Find out if we need to throw away the tuple or not.
+        if(type == DataType.BAG && removeTupleFromBag(tableSchema, fSchema)){
+          List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
+          arrFields.add(getHowlFSFromPigFS(fSchema.schema.getField(0).schema.getField(0), tableSchema));
+          howlFSchema = new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), null);
+      }
+      else{
+          howlFSchema = getHowlFSFromPigFS(fSchema, tableSchema);
+      }
+      fieldSchemas.add(howlFSchema);
+      } catch (HCatException he){
+          throw new FrontendException(he.getMessage(),PigHCatUtil.PIG_EXCEPTION_CODE,he);
+      }
+    }
+
+    return new HCatSchema(fieldSchemas);
+  }
+
+  private void validateUnNested(Schema innerSchema) throws FrontendException{
+
+    for(FieldSchema innerField : innerSchema.getFields()){
+      validateAlias(innerField.alias);
+      if(DataType.isComplex(innerField.type)) {
+        throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+    }
+  }
+
+  private boolean removeTupleFromBag(HCatSchema tableSchema, FieldSchema bagFieldSchema) throws HCatException{
+
+    String colName = bagFieldSchema.alias;
+    for(HCatFieldSchema field : tableSchema.getFields()){
+      if(colName.equalsIgnoreCase(field.getName())){
+        return (field.getArrayElementSchema().get(0).getType() == Type.STRUCT) ? false : true;
+      }
+    }
+    // Column was not found in table schema. Its a new column
+    List<FieldSchema> tupSchema = bagFieldSchema.schema.getFields();
+    return (tupSchema.size() == 1 && tupSchema.get(0).schema == null) ? true : false;
+  }
+
+
+  private HCatFieldSchema getHowlFSFromPigFS(FieldSchema fSchema, HCatSchema hcatTblSchema) throws FrontendException, HCatException{
+
+    byte type = fSchema.type;
+    switch(type){
+
+    case DataType.CHARARRAY:
+    case DataType.BIGCHARARRAY:
+      return new HCatFieldSchema(fSchema.alias, Type.STRING, null);
+
+    case DataType.INTEGER:
+      return new HCatFieldSchema(fSchema.alias, Type.INT, null);
+
+    case DataType.LONG:
+      return new HCatFieldSchema(fSchema.alias, Type.BIGINT, null);
+
+    case DataType.FLOAT:
+      return new HCatFieldSchema(fSchema.alias, Type.FLOAT, null);
+
+    case DataType.DOUBLE:
+      return new HCatFieldSchema(fSchema.alias, Type.DOUBLE, null);
+
+    case DataType.BAG:
+      Schema bagSchema = fSchema.schema;
+      List<HCatFieldSchema> arrFields = new ArrayList<HCatFieldSchema>(1);
+      arrFields.add(getHowlFSFromPigFS(bagSchema.getField(0), hcatTblSchema));
+      return new HCatFieldSchema(fSchema.alias, Type.ARRAY, new HCatSchema(arrFields), "");
+
+    case DataType.TUPLE:
+      List<String> fieldNames = new ArrayList<String>();
+      List<HCatFieldSchema> howlFSs = new ArrayList<HCatFieldSchema>();
+      for( FieldSchema fieldSchema : fSchema.schema.getFields()){
+        fieldNames.add( fieldSchema.alias);
+        howlFSs.add(getHowlFSFromPigFS(fieldSchema, hcatTblSchema));
+      }
+      return new HCatFieldSchema(fSchema.alias, Type.STRUCT, new HCatSchema(howlFSs), "");
+
+    case DataType.MAP:{
+      // Pig's schema contain no type information about map's keys and
+      // values. So, if its a new column assume <string,string> if its existing
+      // return whatever is contained in the existing column.
+      HCatFieldSchema mapField = getTableCol(fSchema.alias, hcatTblSchema);
+      HCatFieldSchema valFS;
+      List<HCatFieldSchema> valFSList = new ArrayList<HCatFieldSchema>(1);
+
+      if(mapField != null){
+        Type mapValType = mapField.getMapValueSchema().get(0).getType();
+
+        switch(mapValType){
+        case STRING:
+        case BIGINT:
+        case INT:
+        case FLOAT:
+        case DOUBLE:
+          valFS = new HCatFieldSchema(fSchema.alias, mapValType, null);
+          break;
+        default:
+          throw new FrontendException("Only pig primitive types are supported as map value types.", PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+        valFSList.add(valFS);
+        return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
+      }
+
+      // Column not found in target table. Its a new column. Its schema is map<string,string>
+      valFS = new HCatFieldSchema(fSchema.alias, Type.STRING, "");
+      valFSList.add(valFS);
+      return new HCatFieldSchema(fSchema.alias,Type.MAP,Type.STRING, new HCatSchema(valFSList),"");
+     }
+
+    default:
+      throw new FrontendException("Unsupported type: "+type+"  in Pig's schema", PigHCatUtil.PIG_EXCEPTION_CODE);
+    }
+  }
+
+  @Override
+  public void prepareToWrite(RecordWriter writer) throws IOException {
+    this.writer = writer;
+    computedSchema = (HCatSchema)ObjectSerializer.deserialize(UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[]{sign}).getProperty(COMPUTED_OUTPUT_SCHEMA));
+  }
+
+  @Override
+  public void putNext(Tuple tuple) throws IOException {
+
+    List<Object> outgoing = new ArrayList<Object>(tuple.size());
+
+    int i = 0;
+    for(HCatFieldSchema fSchema : computedSchema.getFields()){
+      outgoing.add(getJavaObj(tuple.get(i++), fSchema));
+    }
+    try {
+      writer.write(null, new DefaultHCatRecord(outgoing));
+    } catch (InterruptedException e) {
+      throw new BackendException("Error while writing tuple: "+tuple, PigHCatUtil.PIG_EXCEPTION_CODE, e);
+    }
+  }
+
+  private Object getJavaObj(Object pigObj, HCatFieldSchema howlFS) throws ExecException, HCatException{
+
+    // The real work-horse. Spend time and energy in this method if there is
+    // need to keep HowlStorer lean and go fast.
+    Type type = howlFS.getType();
+
+    switch(type){
+
+    case STRUCT:
+      // Unwrap the tuple.
+      return ((Tuple)pigObj).getAll();
+      //        Tuple innerTup = (Tuple)pigObj;
+      //
+      //      List<Object> innerList = new ArrayList<Object>(innerTup.size());
+      //      int i = 0;
+      //      for(HowlTypeInfo structFieldTypeInfo : typeInfo.getAllStructFieldTypeInfos()){
+      //        innerList.add(getJavaObj(innerTup.get(i++), structFieldTypeInfo));
+      //      }
+      //      return innerList;
+    case ARRAY:
+      // Unwrap the bag.
+      DataBag pigBag = (DataBag)pigObj;
+      HCatFieldSchema tupFS = howlFS.getArrayElementSchema().get(0);
+      boolean needTuple = tupFS.getType() == Type.STRUCT;
+      List<Object> bagContents = new ArrayList<Object>((int)pigBag.size());
+      Iterator<Tuple> bagItr = pigBag.iterator();
+
+      while(bagItr.hasNext()){
+        // If there is only one element in tuple contained in bag, we throw away the tuple.
+        bagContents.add(needTuple ? getJavaObj(bagItr.next(), tupFS) : bagItr.next().get(0));
+
+      }
+      return bagContents;
+
+      //    case MAP:
+      //     Map<String,DataByteArray> pigMap = (Map<String,DataByteArray>)pigObj;
+      //     Map<String,Long> typeMap = new HashMap<String, Long>();
+      //     for(Entry<String, DataByteArray> entry: pigMap.entrySet()){
+      //       typeMap.put(entry.getKey(), new Long(entry.getValue().toString()));
+      //     }
+      //     return typeMap;
+    default:
+      return pigObj;
+    }
+  }
+
+  @Override
+  public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException {
+
+    // Need to necessarily override this method since default impl assumes HDFS
+    // based location string.
+    return location;
+  }
+
+  @Override
+  public void setStoreFuncUDFContextSignature(String signature) {
+    sign = signature;
+  }
+
+
+  protected void doSchemaValidations(Schema pigSchema, HCatSchema tblSchema) throws FrontendException, HCatException{
+
+    // Iterate through all the elements in Pig Schema and do validations as
+    // dictated by semantics, consult HCatSchema of table when need be.
+
+    for(FieldSchema pigField : pigSchema.getFields()){
+      byte type = pigField.type;
+      String alias = pigField.alias;
+      validateAlias(alias);
+      HCatFieldSchema howlField = getTableCol(alias, tblSchema);
+
+      if(DataType.isComplex(type)){
+        switch(type){
+
+        case DataType.MAP:
+          if(howlField != null){
+            if(howlField.getMapKeyType() != Type.STRING){
+              throw new FrontendException("Key Type of map must be String "+howlField,  PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+            if(howlField.getMapValueSchema().get(0).isComplex()){
+              throw new FrontendException("Value type of map cannot be complex" + howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+          }
+          break;
+
+        case DataType.BAG:
+          // Only map is allowed as complex type in tuples inside bag.
+          for(FieldSchema innerField : pigField.schema.getField(0).schema.getFields()){
+            if(innerField.type == DataType.BAG || innerField.type == DataType.TUPLE) {
+              throw new FrontendException("Complex types cannot be nested. "+innerField, PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+            validateAlias(innerField.alias);
+          }
+          if(howlField != null){
+            // Do the same validation for HCatSchema.
+            HCatFieldSchema arrayFieldScehma = howlField.getArrayElementSchema().get(0);
+            Type hType = arrayFieldScehma.getType();
+            if(hType == Type.STRUCT){
+              for(HCatFieldSchema structFieldInBag : arrayFieldScehma.getStructSubSchema().getFields()){
+                if(structFieldInBag.getType() == Type.STRUCT || structFieldInBag.getType() == Type.ARRAY){
+                  throw new FrontendException("Nested Complex types not allowed "+ howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+                }
+              }
+            }
+            if(hType == Type.MAP){
+              if(arrayFieldScehma.getMapKeyType() != Type.STRING){
+                throw new FrontendException("Key Type of map must be String "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+              }
+              if(arrayFieldScehma.getMapValueSchema().get(0).isComplex()){
+                throw new FrontendException("Value type of map cannot be complex "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+              }
+            }
+            if(hType == Type.ARRAY) {
+              throw new FrontendException("Arrays cannot contain array within it. "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+            }
+          }
+          break;
+
+        case DataType.TUPLE:
+          validateUnNested(pigField.schema);
+          if(howlField != null){
+            for(HCatFieldSchema structFieldSchema : howlField.getStructSubSchema().getFields()){
+              if(structFieldSchema.isComplex()){
+                throw new FrontendException("Nested Complex types are not allowed."+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+              }
+            }
+          }
+          break;
+
+        default:
+          throw new FrontendException("Internal Error.", PigHCatUtil.PIG_EXCEPTION_CODE);
+        }
+      }
+    }
+
+    for(HCatFieldSchema howlField : tblSchema.getFields()){
+
+      // We dont do type promotion/demotion.
+      Type hType = howlField.getType();
+      switch(hType){
+      case SMALLINT:
+      case TINYINT:
+      case BOOLEAN:
+        throw new FrontendException("Incompatible type found in howl table schema: "+howlField, PigHCatUtil.PIG_EXCEPTION_CODE);
+      }
+    }
+  }
+
+  private void validateAlias(String alias) throws FrontendException{
+    if(alias == null) {
+      throw new FrontendException("Column name for a field is not specified. Please provide the full schema as an argument to HCatStorer.", PigHCatUtil.PIG_EXCEPTION_CODE);
+    }
+    if(alias.matches(".*[A-Z]+.*")) {
+      throw new FrontendException("Column names should all be in lowercase. Invalid name found: "+alias, PigHCatUtil.PIG_EXCEPTION_CODE);
+    }
+  }
+
+  // Finds column by name in HCatSchema, if not found returns null.
+  private HCatFieldSchema getTableCol(String alias, HCatSchema tblSchema){
+
+    for(HCatFieldSchema howlField : tblSchema.getFields()){
+      if(howlField.getName().equalsIgnoreCase(alias)){
+        return howlField;
+      }
+    }
+    // Its a new column
+    return null;
+  }
+
+  @Override
+  public void cleanupOnFailure(String location, Job job) throws IOException {
+    // No-op.
+  }
+
+  @Override
+  public void storeStatistics(ResourceStatistics stats, String arg1, Job job) throws IOException {
+  }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximLoader.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximLoader.java?rev=1099539&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximLoader.java (added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximLoader.java Wed May  4 17:50:42 2011
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatBaseInputFormat;
+import org.apache.hcatalog.mapreduce.HCatEximInputFormat;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.pig.Expression;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * Pig {@link LoadFunc} to read data/metadata from hcatalog exported location
+ */
+
+public class HCatEximLoader extends HCatBaseLoader {
+
+  private static final Log LOG = LogFactory.getLog(HCatEximLoader.class);
+
+  private HCatSchema tableSchema;
+  private HCatSchema partitionSchema;
+  private HCatEximInputFormat inputFormat;
+
+  public HCatEximLoader() {
+    LOG.debug("HCatEximLoader ctored");
+  }
+
+  @Override
+  public ResourceSchema getSchema(String location, Job job) throws IOException {
+    LOG.debug("getSchema with location :" + location);
+    if (tableSchema == null) {
+      List<HCatSchema> rv = HCatEximInputFormat.setInput(job, location, null);
+      tableSchema = rv.get(0);
+      partitionSchema = rv.get(1);
+    }
+    LOG.debug("getSchema got schema :" + tableSchema.toString());
+    List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>();
+    colsPlusPartKeys.addAll(tableSchema.getFields());
+    colsPlusPartKeys.addAll(partitionSchema.getFields());
+    outputSchema = new HCatSchema(colsPlusPartKeys);
+    return PigHCatUtil.getResourceSchema(outputSchema);
+  }
+
+  @Override
+  public String[] getPartitionKeys(String location, Job job) throws IOException {
+    LOG.warn("getPartitionKeys with location :" + location);
+    /*
+    if (tableSchema == null) {
+      List<HCatSchema> rv = HCatEximInputFormat.setInput(job, location, null);
+      tableSchema = rv.get(0);
+      partitionSchema = rv.get(1);
+    }
+    return partitionSchema.getFieldNames().toArray(new String[0]);
+    */
+    return null;
+  }
+
+  @Override
+  public void setPartitionFilter(Expression partitionFilter) throws IOException {
+    LOG.debug("setPartitionFilter with filter :" + partitionFilter.toString());
+  }
+
+  @Override
+  public void setLocation(String location, Job job) throws IOException {
+    LOG.debug("setLocation with location :" + location);
+    List<HCatSchema> rv = HCatEximInputFormat.setInput(job, location, null);
+    tableSchema = rv.get(0);
+    partitionSchema = rv.get(1);
+    List<HCatFieldSchema> colsPlusPartKeys = new ArrayList<HCatFieldSchema>();
+    colsPlusPartKeys.addAll(tableSchema.getFields());
+    colsPlusPartKeys.addAll(partitionSchema.getFields());
+    outputSchema = new HCatSchema(colsPlusPartKeys);
+    UDFContext udfContext = UDFContext.getUDFContext();
+    Properties props = udfContext.getUDFProperties(this.getClass(),
+          new String[] {signature});
+    RequiredFieldList requiredFieldsInfo =
+          (RequiredFieldList) props.get(PRUNE_PROJECTION_INFO);
+    if (requiredFieldsInfo != null) {
+      ArrayList<HCatFieldSchema> fcols = new ArrayList<HCatFieldSchema>();
+      for (RequiredField rf : requiredFieldsInfo.getFields()) {
+        fcols.add(tableSchema.getFields().get(rf.getIndex()));
+      }
+      outputSchema = new HCatSchema(fcols);
+      try {
+        HCatBaseInputFormat.setOutputSchema(job, outputSchema);
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
+    }
+  }
+
+
+  @Override
+  public InputFormat getInputFormat() throws IOException {
+    if (inputFormat == null) {
+      inputFormat = new HCatEximInputFormat();
+    }
+    return inputFormat;
+  }
+
+}



Mime
View raw message