incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r1241353 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/mapred/ src/java/org/apache/hcatalog/storagehandler/ src/test/org/apache/hcatalog/data/ src/test/org/apache/hcatalog/mapred/
Date Tue, 07 Feb 2012 05:49:09 GMT
Author: gates
Date: Tue Feb  7 05:49:08 2012
New Revision: 1241353

URL: http://svn.apache.org/viewvc?rev=1241353&view=rev
Log:
HCATALOG-208. mapred HCatInputFormat/HCatOutputFomat changes to make it work from hive

Added:
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/
    incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1241353&r1=1241352&r2=1241353&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Feb  7 05:49:08 2012
@@ -35,6 +35,8 @@ Release 0.3.0 - Unreleased
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-208. mapred HCatInputFormat/HCatOutputFomat changes to make it work from hive (khorgath
via gates)
+
   HCAT-207. Changes to current HCat subsystem to allow it to work with hive (khorgath via
gates)
 
   HCAT-204. HCatRecord SerDe (khorgath via gates)

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java?rev=1241353&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java
(added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java
Tue Feb  7 05:49:08 2012
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+
+
+public class HCatMapredInputFormat implements InputFormat {
+
+  
+  private static final Log LOG = LogFactory.getLog(HCatMapredInputFormat.class);
+  HCatInputFormat hci;
+  
+  public HCatMapredInputFormat(){
+    hci = new HCatInputFormat();
+  }
+  
+  @Override
+  public RecordReader getRecordReader(InputSplit split, JobConf job,
+      Reporter arg2) throws IOException {
+    try {
+      org.apache.hadoop.mapreduce.RecordReader<WritableComparable, HCatRecord> rr;
+      TaskAttemptContext taContext 
+      = HCatHadoopShims.Instance.get().createTaskAttemptContext(job, new TaskAttemptID());
+      rr = hci.createRecordReader(((HiveHCatSplitWrapper)split).getHCatSplit(), taContext);
+      rr.initialize(((HiveHCatSplitWrapper)split).getHCatSplit(),taContext);
+      return (RecordReader) rr;
+
+    } catch (java.lang.InterruptedException e){
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int arg1) throws IOException {
+
+    try {
+      List<InputSplit> hsplits = new ArrayList<InputSplit>();
+      for (org.apache.hadoop.mapreduce.InputSplit hs : hci.getSplits(
+          HCatHadoopShims.Instance.get().createJobContext(job, new JobID()))){
+        HiveHCatSplitWrapper hwrapper = new HiveHCatSplitWrapper((HCatSplit)hs);
+
+        String hwrapperPath = hwrapper.getPath().toString();
+        String mapredInputDir = job.get("mapred.input.dir","null");
+
+        if(hwrapperPath.startsWith(mapredInputDir)){
+          hsplits.add(hwrapper);
+        }
+      }
+      InputSplit[] splits = new InputSplit[hsplits.size()];
+      for (int i = 0 ; i <hsplits.size(); i++){
+        splits[i] = hsplits.get(i);
+      }
+      return splits;
+    } catch (java.lang.InterruptedException e){
+      throw new IOException(e);
+    }
+  }
+  
+  public static void setTableDesc(TableDesc tableDesc, Map<String,String> jobProperties)
throws IOException{
+    try {
+    Pair<String,String> dbAndTableName = HCatUtil.getDbAndTableName(tableDesc.getTableName());
+    InputJobInfo info = InputJobInfo.create(dbAndTableName.first, dbAndTableName.second,
"", null, null);
+    jobProperties.put(HCatConstants.HCAT_KEY_JOB_INFO
+        ,InitializeInput.getSerializedHcatKeyJobInfo(
+            null, info,tableDesc.getProperties().getProperty("location")));
+    } catch (Exception e){
+      throw new IOException(e);
+    }
+  }
+
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java?rev=1241353&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java
(added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java
Tue Feb  7 05:49:08 2012
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.Pair;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils.CollectionBuilder;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.InitializeInput;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.shims.HCatHadoopShims;
+import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+public class HCatMapredOutputFormat implements OutputFormat, HiveOutputFormat {
+
+  HCatOutputFormat hco;
+  private static final Log LOG = LogFactory.getLog(HCatMapredOutputFormat.class);
+  
+  public HCatMapredOutputFormat() {
+    LOG.debug("HCatMapredOutputFormat init");
+    hco = new HCatOutputFormat();
+  }
+  
+  @Override
+  public void checkOutputSpecs(FileSystem arg0, JobConf arg1)
+      throws IOException {
+    LOG.debug("HCatMapredOutputFormat checkOutputSpecs");
+    JobContext context = HCatHadoopShims.Instance.get().createJobContext(arg1, new JobID());
+    try {
+      hco.checkOutputSpecs(context);
+    } catch (InterruptedException e) {
+      LOG.warn(e.getMessage());
+      HCatUtil.logStackTrace(LOG);
+    }
+  }
+
+  @Override
+  public RecordWriter getRecordWriter(FileSystem arg0, JobConf arg1,
+      String arg2, Progressable arg3) throws IOException {
+    // this is never really called from hive, but it's part of the IF interface
+    
+    LOG.debug("HCatMapredOutputFormat getRecordWriter");
+    return getRW(arg1);
+  }
+
+  public HCatMapredRecordWriter getRW(Configuration arg1) throws IOException {
+    try {
+      JobContext jc = HCatHadoopShims.Instance.get().createJobContext(arg1, new JobID());
+      TaskAttemptContext taContext = HCatHadoopShims.Instance.get().createTaskAttemptContext(arg1,
new TaskAttemptID());
+    return new HCatMapredOutputFormat.HCatMapredRecordWriter(hco,jc,taContext);
+    } catch (Exception e){
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+      JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed,
+      Properties tableProperties, Progressable progress) throws IOException {
+    LOG.debug("HCatMapredOutputFormat getHiveRecordWriter");
+    final HCatMapredRecordWriter rw = getRW(jc);
+    return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+      public void write(Writable r) throws IOException {
+        rw.write(null, (HCatRecord) r);
+      }
+      public void close(boolean abort) throws IOException {
+        rw.setAbortStatus(abort);
+        rw.close(null);
+      }
+    };
+
+  }
+  
+  public static void setTableDesc(TableDesc tableDesc, Map<String,String> jobProperties)
throws IOException {
+    setTableDesc(tableDesc,jobProperties,new LinkedHashMap<String, String>());
+  }
+
+  public static void setPartitionDesc(PartitionDesc ptnDesc, Map<String,String> jobProperties)
throws IOException {
+    setTableDesc(ptnDesc.getTableDesc(),jobProperties,ptnDesc.getPartSpec());
+  }
+
+  public static void setTableDesc(TableDesc tableDesc, Map<String,String> jobProperties,
Map<String,String> ptnValues) throws IOException {
+    Pair<String,String> dbAndTableName = HCatUtil.getDbAndTableName(tableDesc.getTableName());
+
+    OutputJobInfo outputJobInfo = OutputJobInfo.create(
+        dbAndTableName.first, dbAndTableName.second, 
+        ptnValues, null, null);
+    
+    Job job = new Job(new Configuration()); 
+      // TODO : verify with thw if this needs to be shim-ed. There exists no current Shim
+      // for instantiating a Job, and we use it only temporarily.
+    
+    HCatOutputFormat.setOutput(job, outputJobInfo);
+    LOG.debug("HCatOutputFormat.setOutput() done");
+    
+    // Now we need to set the schema we intend to write
+    
+    Properties tprops = tableDesc.getProperties();
+    String columnNameProperty = tprops.getProperty(Constants.LIST_COLUMNS);
+    String columnTypeProperty = tprops.getProperty(Constants.LIST_COLUMN_TYPES);
+    
+    List<String> columnNames;
+    // all table column names
+    if (columnNameProperty.length() == 0) {
+      columnNames = new ArrayList<String>();
+    } else {
+      columnNames = Arrays.asList(columnNameProperty.split(","));
+    }
+    
+    List<TypeInfo> columnTypes;
+    // all column types
+    if (columnTypeProperty.length() == 0) {
+      columnTypes = new ArrayList<TypeInfo>();
+    } else {
+      columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+    }
+
+    StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames,
columnTypes);
+    HCatSchema hsch = HCatSchemaUtils.getHCatSchema(rowTypeInfo).getFields().get(0).getStructSubSchema();
+      // getting inner schema, because it's the difference between struct<i:int,j:int>
and i:int,j:int.
+      // and that's what we need to provide to HCatOutputFormat
+    
+    LOG.debug("schema "+hsch.toString());
+    HCatOutputFormat.setSchema(job, hsch);
+    
+    for (String confToSave : HCatConstants.OUTPUT_CONFS_TO_SAVE){
+      String confVal = job.getConfiguration().get(confToSave);
+      if (confVal != null){
+        jobProperties.put(confToSave, confVal);
+      }
+    }
+
+  }
+  
+  public class HCatMapredRecordWriter implements org.apache.hadoop.mapred.RecordWriter<WritableComparable<?>,
HCatRecord>{
+
+    org.apache.hadoop.mapreduce.RecordWriter writer;
+    org.apache.hadoop.mapreduce.OutputCommitter outputCommitter;
+    TaskAttemptContext taContext;
+    JobContext jc;
+    boolean jobIsSetup = false;
+    boolean wroteData = false;
+    boolean aborted = false;
+    
+    public HCatMapredRecordWriter(
+       HCatOutputFormat hco, JobContext jc,
+        TaskAttemptContext taContext) throws IOException{
+      this.taContext = taContext;
+      try {
+        this.outputCommitter = hco.getOutputCommitter(taContext);
+        this.writer = hco.getRecordWriter(taContext);
+      } catch (java.lang.InterruptedException e){
+        throw new IOException(e);
+      }
+      this.wroteData = false;
+      this.aborted = false;
+    }
+    
+    public void setAbortStatus(boolean abort) {
+      this.aborted = abort;
+    }
+    
+    @Override
+    public void close(Reporter arg0) throws IOException {
+      try {
+        writer.close(taContext);
+        if (outputCommitter.needsTaskCommit(taContext)){
+          outputCommitter.commitTask(taContext);
+        }
+        if (this.wroteData && this.jobIsSetup){
+          if (!this.aborted){
+            outputCommitter.commitJob(taContext);
+          } else {
+            outputCommitter.cleanupJob(taContext);
+          }
+        }
+      } catch (Exception e){
+        throw new IOException(e);
+      }
+    }
+
+    @Override
+    public void write(WritableComparable arg0, HCatRecord arg1) throws IOException {
+      try {
+        if (!jobIsSetup){
+          this.outputCommitter.setupJob(taContext);
+          jobIsSetup = true;
+        }
+        writer.write(arg0, arg1);
+        this.wroteData = true;
+      } catch (Exception e){
+        throw new IOException(e);
+      }
+    }
+    
+  }
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java?rev=1241353&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java
(added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java
Tue Feb  7 05:49:08 2012
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapred;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.mapreduce.HCatSplit;
+
+/**
+ * Even though HiveInputSplit expects an InputSplit to wrap, it
+ * expects getPath() to work from the underlying split. And since
+ * that's populated by HiveInputSplit only if the underlying
+ * split is a FileSplit, the HCatSplit that goes to Hive needs
+ * to be a FileSplit. And since FileSplit is a class, and 
+ * mapreduce.InputSplit is also a class, we can't do the trick
+ * where we implement mapred.inputSplit and extend mapred.InputSplit.
+ * 
+ * Thus, we compose the other HCatSplit, and work with it.
+ * 
+ * Also, this means that reading HCat through Hive will only work
+ * when the underlying InputFormat's InputSplit has implemented
+ * a getPath() - either by subclassing FileSplit, or by itself -
+ * we make a best effort attempt to call a getPath() via reflection,
+ * but if that doesn't work, this isn't going to work.
+ *
+ */
+public class HiveHCatSplitWrapper extends FileSplit implements InputSplit {
+  
+  Log LOG = LogFactory.getLog(HiveHCatSplitWrapper.class);
+
+  HCatSplit hsplit;
+  
+  public HiveHCatSplitWrapper() {
+    super((Path) null, 0, 0, (String[]) null);
+  }
+  
+  public HiveHCatSplitWrapper(HCatSplit hsplit) {
+    this();
+    this.hsplit = hsplit;
+  }
+
+  @Override
+  public void readFields(DataInput input) throws IOException {
+    hsplit = new HCatSplit();
+    hsplit.readFields(input);
+  }
+  
+  @Override
+  public void write(DataOutput output) throws IOException {
+    hsplit.write(output);
+  }
+  
+  @Override
+  public long getLength() {
+    return hsplit.getLength();
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    return hsplit.getLocations();
+  }
+
+  @Override
+  public Path getPath() {
+    /**
+     * This function is the reason this class exists at all.
+     * See class description for why.
+     */
+    if (hsplit.getBaseSplit() instanceof FileSplit){
+      // if baseSplit is a FileSplit, then return that.
+      return ((FileSplit)hsplit.getBaseSplit()).getPath();
+    } else {
+      // use reflection to try and determine if underlying class has a getPath() method that
returns a path
+      Class<?> c = hsplit.getBaseSplit().getClass();
+      try {
+        return (Path) (c.getMethod("getPath")).invoke(hsplit.getBaseSplit());
+      } catch (Exception e) {
+        HCatUtil.logStackTrace(LOG);
+        // not much we can do - default exit will return null Path
+      }
+      
+    }
+    LOG.error("Returning empty path from getPath(), Hive will not be happy.");
+    return new Path(""); // This will cause hive to error, but we can't do anything for that
situation.
+  }
+
+  public HCatSplit getHCatSplit() {
+    return hsplit;
+  }
+  
+}

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java?rev=1241353&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java
(added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java
Tue Feb  7 05:49:08 2012
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.storagehandler;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.ql.security.authorization.Privilege;
+
+/**
+ * This class is a dummy implementation of HiveAuthorizationProvider to provide
+ * dummy authorization functionality for other classes to extend and override.
+ */
+class DummyHCatAuthProvider implements HiveAuthorizationProvider {
+    
+    @Override
+    public Configuration getConf() {
+        return null;
+    }
+    
+    @Override
+    public void setConf(Configuration conf) {
+    }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+     * #init(org.apache.hadoop.conf.Configuration)
+     */
+    @Override
+    public void init(Configuration conf) throws HiveException {
+    }
+    
+    @Override
+    public HiveAuthenticationProvider getAuthenticator() {
+        return null;
+    }
+    
+    @Override
+    public void setAuthenticator(HiveAuthenticationProvider authenticator) {
+    }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+     * #authorize(org.apache.hadoop.hive.ql.security.authorization.Privilege[],
+     * org.apache.hadoop.hive.ql.security.authorization.Privilege[])
+     */
+    @Override
+    public void authorize(Privilege[] readRequiredPriv,
+            Privilege[] writeRequiredPriv) throws HiveException,
+            AuthorizationException {
+    }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+     * #authorize(org.apache.hadoop.hive.metastore.api.Database,
+     * org.apache.hadoop.hive.ql.security.authorization.Privilege[],
+     * org.apache.hadoop.hive.ql.security.authorization.Privilege[])
+     */
+    @Override
+    public void authorize(Database db, Privilege[] readRequiredPriv,
+            Privilege[] writeRequiredPriv) throws HiveException,
+            AuthorizationException {
+    }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+     * #authorize(org.apache.hadoop.hive.ql.metadata.Table,
+     * org.apache.hadoop.hive.ql.security.authorization.Privilege[],
+     * org.apache.hadoop.hive.ql.security.authorization.Privilege[])
+     */
+    @Override
+    public void authorize(Table table, Privilege[] readRequiredPriv,
+            Privilege[] writeRequiredPriv) throws HiveException,
+            AuthorizationException {
+    }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+     * #authorize(org.apache.hadoop.hive.ql.metadata.Partition,
+     * org.apache.hadoop.hive.ql.security.authorization.Privilege[],
+     * org.apache.hadoop.hive.ql.security.authorization.Privilege[])
+     */
+    @Override
+    public void authorize(Partition part, Privilege[] readRequiredPriv,
+            Privilege[] writeRequiredPriv) throws HiveException,
+            AuthorizationException {
+    }
+    
+    /*
+     * (non-Javadoc)
+     * 
+     * @see
+     * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider
+     * #authorize(org.apache.hadoop.hive.ql.metadata.Table,
+     * org.apache.hadoop.hive.ql.metadata.Partition, java.util.List,
+     * org.apache.hadoop.hive.ql.security.authorization.Privilege[],
+     * org.apache.hadoop.hive.ql.security.authorization.Privilege[])
+     */
+    @Override
+    public void authorize(Table table, Partition part, List<String> columns,
+            Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv)
+            throws HiveException, AuthorizationException {
+    }
+    
+}

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java?rev=1241353&r1=1241352&r2=1241353&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java
(original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java
Tue Feb  7 05:49:08 2012
@@ -186,7 +186,7 @@ public abstract class HCatStorageHandler
      * ()
      */
     @Override
-    public final Class<? extends InputFormat> getInputFormatClass() {
+    public Class<? extends InputFormat> getInputFormatClass() {
         return DummyInputFormat.class;
     }
     
@@ -198,7 +198,7 @@ public abstract class HCatStorageHandler
      * ()
      */
     @Override
-    public final Class<? extends OutputFormat> getOutputFormatClass() {
+    public Class<? extends OutputFormat> getOutputFormatClass() {
         return DummyOutputFormat.class;
     }
     

Added: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java?rev=1241353&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java
(added)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java
Tue Feb  7 05:49:08 2012
@@ -0,0 +1,163 @@
+package org.apache.hcatalog.storagehandler;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecordSerDe;
+import org.apache.hcatalog.mapred.HCatMapredInputFormat;
+import org.apache.hcatalog.mapred.HCatMapredOutputFormat;
+import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.apache.hcatalog.storagehandler.HCatStorageHandler.DummyInputFormat;
+import org.apache.hcatalog.storagehandler.HCatStorageHandler.DummyOutputFormat;
+
+public class HCatStorageHandlerImpl extends HCatStorageHandler {
+
+  Class isd;
+  Class osd;
+
+  Log LOG = LogFactory.getLog(HCatStorageHandlerImpl.class);
+  
+  @Override
+  public Class<? extends HCatInputStorageDriver> getInputStorageDriver() {
+    return isd;
+  }
+
+  @Override
+  public Class<? extends HCatOutputStorageDriver> getOutputStorageDriver() {
+    return osd;
+  }
+
+  @Override
+  public HiveAuthorizationProvider getAuthorizationProvider()
+      throws HiveException {
+    return new DummyHCatAuthProvider();
+  }
+
+  @Override
+  public void commitCreateTable(Table table) throws MetaException {
+  }
+
+  @Override
+  public void commitDropTable(Table table, boolean deleteData)
+      throws MetaException {
+    // do nothing special
+  }
+
+  @Override
+  public void preCreateTable(Table table) throws MetaException {
+    // do nothing special
+  }
+
+  @Override
+  public void preDropTable(Table table) throws MetaException {
+    // do nothing special
+  }
+
+  @Override
+  public void rollbackCreateTable(Table table) throws MetaException {
+    // do nothing special
+  }
+
+  @Override
+  public void rollbackDropTable(Table table) throws MetaException {
+    // do nothing special
+  }
+
+  @Override
+  public HiveMetaHook getMetaHook() {
+    return this;
+  }
+
+  @Override
+  public void configureTableJobProperties(TableDesc tableDesc,
+      Map<String, String> jobProperties) {
+    // Information about the table and the job to be performed
+    // We pass them on into the mepredif / mapredof
+
+    Properties tprops = tableDesc.getProperties();
+
+    if(LOG.isDebugEnabled()){
+      LOG.debug("HCatStorageHandlerImpl configureTableJobProperties:");
+      HCatUtil.logStackTrace(LOG);
+      HCatUtil.logMap(LOG, "jobProperties", jobProperties);
+      if (tprops!= null){
+        HCatUtil.logEntrySet(LOG, "tableprops", tprops.entrySet());
+      }
+      LOG.debug("tablename : "+tableDesc.getTableName());
+    }
+    
+    // copy existing table props first
+    for (Entry e : tprops.entrySet()){
+      jobProperties.put((String)e.getKey(), (String)e.getValue());
+    }
+    
+    // try to set input format related properties
+    try {
+      HCatMapredInputFormat.setTableDesc(tableDesc,jobProperties);
+    } catch (IOException ioe){
+      // ok, things are probably not going to work, but we
+      // can't throw out exceptions per interface. So, we log.
+      LOG.error("HCatInputFormat init fail " + ioe.getMessage());
+      LOG.error(ioe.getStackTrace());
+    }
+
+    // try to set output format related properties
+    try {
+      HCatMapredOutputFormat.setTableDesc(tableDesc,jobProperties);
+    } catch (IOException ioe){
+      // ok, things are probably not going to work, but we
+      // can't throw out exceptions per interface. So, we log.
+      LOG.error("HCatOutputFormat init fail " + ioe.getMessage());
+      LOG.error(ioe.getStackTrace());
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+  }
+
+  @Override
+  public Class<? extends SerDe> getSerDeClass() {
+      return HCatRecordSerDe.class;
+  }
+
+  @Override
+  public final Class<? extends InputFormat> getInputFormatClass() {
+      return HCatMapredInputFormat.class;
+  }
+  
+  @Override
+  public final Class<? extends OutputFormat> getOutputFormatClass() {
+      return HCatMapredOutputFormat.class;
+  }
+
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java?rev=1241353&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java (added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java Tue
Feb  7 05:49:08 2012
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.data;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+
+/**
+ * Helper class for Other Data Testers
+ */
+public class HCatDataCheckUtil {
+
+  public static Driver instantiateDriver(MiniCluster cluster) {
+    HiveConf hiveConf = new HiveConf(HCatDataCheckUtil.class);
+    for (Entry e : cluster.getProperties().entrySet()){
+      hiveConf.set(e.getKey().toString(), e.getValue().toString());
+    }
+    hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, "");
+    hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false");
+    
+    Log logger = LogFactory.getLog(HCatOutputFormat.class);
+    HCatUtil.logHiveConf(logger , hiveConf);
+    
+    Driver driver = new Driver(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+    return driver;
+  }
+
+  public static void generateDataFile(MiniCluster cluster, String fileName) throws IOException
{
+    MiniCluster.deleteFile(cluster, fileName);
+    String[] input = new String[50];
+    for(int i = 0; i < 50; i++) {
+      input[i] = (i % 5) + "\t" + i  + "\t" + "_S" + i + "S_";
+    }
+    MiniCluster.createInputFile(cluster, fileName, input);
+  }
+
+  public static void createTable(Driver driver, String tableName, String createTableArgs)
+      throws CommandNeedRetryException, IOException {
+    String createTable = "create table " + tableName + createTableArgs;
+    int retCode = driver.run(createTable).getResponseCode();
+    if(retCode != 0) {
+      throw new IOException("Failed to create table. ["+createTable+"], return code from
hive driver : ["+retCode+"]");
+    }
+  }
+
+  public static void dropTable(Driver driver, String tablename) throws IOException, CommandNeedRetryException{
+    driver.run("drop table if exists "+tablename);
+  }
+
+  public static ArrayList<String> formattedRun(Driver driver, String name, String selectCmd)
+      throws CommandNeedRetryException, IOException {
+    driver.run(selectCmd);
+    ArrayList<String> src_values = new ArrayList<String>();
+    driver.getResults(src_values);
+    for (String s : src_values){
+      System.out.println(name+":"+s);
+    }
+    return src_values;
+  }
+
+}

Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java?rev=1241353&view=auto
==============================================================================
--- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java
(added)
+++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java
Tue Feb  7 05:49:08 2012
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hcatalog.MiniCluster;
+import org.apache.hcatalog.data.HCatDataCheckUtil;
+import org.apache.hcatalog.mapred.HCatMapredInputFormat;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.impl.util.UDFContext;
+
+public class TestHiveHCatInputFormat extends TestCase {
+  private static MiniCluster cluster = MiniCluster.buildCluster();
+  private static Driver driver;
+
+  String PTNED_TABLE = "junit_testhiveinputintegration_ptni";
+  String UNPTNED_TABLE = "junit_testhiveinputintegration_noptn";
+  String basicFile = "/tmp/"+PTNED_TABLE+".file";
+
+  public void testFromHive() throws Exception {
+    if (driver == null){
+      driver = HCatDataCheckUtil.instantiateDriver(cluster);
+    }
+
+    Properties props = new Properties();
+    props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name"));
+    String basicFileFullName = cluster.getProperties().getProperty("fs.default.name") + basicFile;
+
+    cleanup();
+    
+    // create source data file
+    HCatDataCheckUtil.generateDataFile(cluster,basicFile);
+
+    String createPtnedTable = "(j int, s string) partitioned by (i int) "
+        +"stored by '"+HCatStorageHandlerImpl.class.getName()+"' tblproperties"
+        + "('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver',"
+        + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+    
+    HCatDataCheckUtil.createTable(driver,PTNED_TABLE,createPtnedTable);
+    
+    String createUnptnedTable = "(i int, j int, s string) "
+        +"stored by '"+HCatStorageHandlerImpl.class.getName()+"' tblproperties"
+        + "('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver',"
+        + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') ";
+    
+    HCatDataCheckUtil.createTable(driver,UNPTNED_TABLE,createUnptnedTable);
+    
+
+    driver.run("describe extended "+UNPTNED_TABLE);
+    ArrayList<String> des_values = new ArrayList<String>();
+    driver.getResults(des_values);
+    for (String s : des_values){
+      System.err.println("du:"+s);
+    }
+
+    driver.run("describe extended "+PTNED_TABLE);
+    ArrayList<String> des2_values = new ArrayList<String>();
+    driver.getResults(des2_values);
+    for (String s : des2_values){
+      System.err.println("dp:"+s);
+    }
+    
+    // use pig to read from source file and put into this table
+
+    PigServer server = new PigServer(ExecType.LOCAL, props);
+    UDFContext.getUDFContext().setClientSystemProps();
+    server.setBatchOn();
+    server.registerQuery("A = load '"+basicFileFullName+"' as (i:int, j:int, s:chararray);");
+    server.registerQuery("store A into '"+UNPTNED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+    server.executeBatch();
+
+    server.setBatchOn();
+    server.registerQuery("A = load '"+basicFileFullName+"' as (i:int, j:int, s:chararray);");
+    server.registerQuery("store A into '"+PTNED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();");
+    server.executeBatch();
+
+    // partitioned by i
+    //  select * from tbl;
+    //  select j,s,i from tbl;
+    //  select * from tbl where i = 3;
+    //  select j,s,i from tbl where i = 3;
+    //  select * from tbl where j = 3;
+    //  select j,s,i from tbl where j = 3;
+
+    ArrayList<String> p_select_star_nofilter = HCatDataCheckUtil.formattedRun(driver,
+        "p_select_star_nofilter","select * from "+PTNED_TABLE);
+    ArrayList<String> p_select_named_nofilter = HCatDataCheckUtil.formattedRun(driver,
+        "p_select_named_nofilter","select j,s,i from "+PTNED_TABLE);
+
+    assertDataIdentical(p_select_star_nofilter,p_select_named_nofilter,50);
+    
+    ArrayList<String> p_select_star_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
+        "p_select_star_ptnfilter","select * from "+PTNED_TABLE+" where i = 3");
+    ArrayList<String> p_select_named_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
+        "p_select_named_ptnfilter","select j,s,i from "+PTNED_TABLE+" where i = 3");
+
+    assertDataIdentical(p_select_star_ptnfilter,p_select_named_ptnfilter,10);
+
+    ArrayList<String> select_star_nonptnfilter = HCatDataCheckUtil.formattedRun(driver,
+        "select_star_nonptnfilter","select * from "+PTNED_TABLE+" where j = 28");
+    ArrayList<String> select_named_nonptnfilter = HCatDataCheckUtil.formattedRun(driver,
+        "select_named_nonptnfilter","select j,s,i from "+PTNED_TABLE+" where j = 28");
+
+    assertDataIdentical(select_star_nonptnfilter,select_named_nonptnfilter,1);
+    
+    // non-partitioned
+    //  select * from tbl;
+    //  select i,j,s from tbl;
+    //  select * from tbl where i = 3;
+    //  select i,j,s from tbl where i = 3;
+
+    //  select j,s,i from tbl;
+    //  select j,s,i from tbl where i = 3;
+
+    ArrayList<String> select_star_nofilter = HCatDataCheckUtil.formattedRun(driver,
+        "select_star_nofilter","select * from "+UNPTNED_TABLE); //i,j,s select * order is
diff for unptn
+    ArrayList<String> select_ijs_nofilter = HCatDataCheckUtil.formattedRun(driver,
+        "select_ijs_nofilter","select i,j,s from "+UNPTNED_TABLE);
+
+    assertDataIdentical(select_star_nofilter,select_ijs_nofilter,50);
+
+    ArrayList<String> select_star_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
+        "select_star_ptnfilter","select * from "+UNPTNED_TABLE+" where i = 3"); //i,j,s
+    ArrayList<String> select_ijs_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
+        "select_ijs_ptnfilter","select i,j,s from "+UNPTNED_TABLE+" where i = 3");
+
+    assertDataIdentical(select_star_ptnfilter,select_ijs_ptnfilter,10);
+
+    ArrayList<String> select_jsi_nofilter = HCatDataCheckUtil.formattedRun(driver,
+        "select_jsi_nofilter","select j,s,i from "+UNPTNED_TABLE);
+    assertDataIdentical(p_select_named_nofilter,select_jsi_nofilter,50,true);
+
+    ArrayList<String> select_jsi_ptnfilter = HCatDataCheckUtil.formattedRun(driver,
+        "select_jsi_ptnfilter","select j,s,i from "+UNPTNED_TABLE+" where i = 3");
+    assertDataIdentical(p_select_named_ptnfilter,select_jsi_ptnfilter,10,true);
+
+  }
+
+  private void assertDataIdentical(ArrayList<String> result1,
+      ArrayList<String> result2, int numRecords) {
+    assertDataIdentical(result1,result2,numRecords,false);
+  }
+
+  private void assertDataIdentical(ArrayList<String> result1,
+      ArrayList<String> result2, int numRecords,boolean doSort) {
+    assertEquals(numRecords, result1.size());
+    assertEquals(numRecords, result2.size());
+    Collections.sort(result1);
+    Collections.sort(result2);
+    for (int i = 0; i < numRecords; i++){
+      assertEquals(result1.get(i),result2.get(i));
+    }
+  }
+
+
+  private void cleanup() throws IOException, CommandNeedRetryException {
+    MiniCluster.deleteFile(cluster, basicFile);
+    HCatDataCheckUtil.dropTable(driver,PTNED_TABLE);
+    HCatDataCheckUtil.dropTable(driver,UNPTNED_TABLE);
+  }
+
+}



Mime
View raw message