From hcatalog-commits-return-664-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Tue Feb 7 05:51:33 2012 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D95999DFD for ; Tue, 7 Feb 2012 05:51:33 +0000 (UTC) Received: (qmail 27369 invoked by uid 500); 7 Feb 2012 05:51:24 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 27321 invoked by uid 500); 7 Feb 2012 05:51:19 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 27314 invoked by uid 99); 7 Feb 2012 05:51:15 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Feb 2012 05:51:15 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Feb 2012 05:51:07 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 249B123888FD; Tue, 7 Feb 2012 05:50:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1241354 - in /incubator/hcatalog/branches/branch-0.3: ./ src/java/org/apache/hcatalog/mapred/ src/java/org/apache/hcatalog/storagehandler/ src/test/org/apache/hcatalog/data/ src/test/org/apache/hcatalog/mapred/ Date: Tue, 07 Feb 2012 05:50:44 -0000 To: hcatalog-commits@incubator.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120207055045.249B123888FD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gates Date: Tue Feb 7 05:50:44 2012 New Revision: 1241354 URL: http://svn.apache.org/viewvc?rev=1241354&view=rev Log: HCATALOG-208. mapred HCatInputFormat/HCatOutputFomat changes to make it work from hive Added: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/mapred/ incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java Modified: incubator/hcatalog/branches/branch-0.3/CHANGES.txt incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java Modified: incubator/hcatalog/branches/branch-0.3/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/CHANGES.txt?rev=1241354&r1=1241353&r2=1241354&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.3/CHANGES.txt (original) +++ incubator/hcatalog/branches/branch-0.3/CHANGES.txt Tue Feb 7 05:50:44 2012 @@ -23,6 +23,8 @@ Release 0.3.0 (unreleased changes) INCOMPATIBLE CHANGES NEW FEATURES + HCAT-208. mapred HCatInputFormat/HCatOutputFomat changes to make it work from hive (khorgath via gates) + HCAT-207. Changes to current HCat subsystem to allow it to work with hive (khorgath via gates) HCAT-204. HCatRecord SerDe (khorgath via gates) Added: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java?rev=1241354&view=auto ============================================================================== --- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java (added) +++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HCatMapredInputFormat.java Tue Feb 7 05:50:44 2012 @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Collection; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hcatalog.mapreduce.HCatSplit; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.Pair; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.mapreduce.InitializeInput; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.shims.HCatHadoopShims; +import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl; +import org.apache.hadoop.hive.ql.plan.TableDesc; + + + +public class HCatMapredInputFormat implements InputFormat { + + + private static final Log LOG = LogFactory.getLog(HCatMapredInputFormat.class); + HCatInputFormat hci; + + public HCatMapredInputFormat(){ + hci = new HCatInputFormat(); + } + + @Override + public RecordReader getRecordReader(InputSplit split, JobConf job, + Reporter arg2) throws IOException { + try { + org.apache.hadoop.mapreduce.RecordReader rr; + TaskAttemptContext taContext + = HCatHadoopShims.Instance.get().createTaskAttemptContext(job, new TaskAttemptID()); + rr = hci.createRecordReader(((HiveHCatSplitWrapper)split).getHCatSplit(), taContext); + rr.initialize(((HiveHCatSplitWrapper)split).getHCatSplit(),taContext); + return (RecordReader) rr; + + } catch (java.lang.InterruptedException e){ + throw new IOException(e); + } + } + + @Override + public InputSplit[] getSplits(JobConf job, int arg1) throws IOException { + + try { + List hsplits = new ArrayList(); + for (org.apache.hadoop.mapreduce.InputSplit hs : hci.getSplits( + HCatHadoopShims.Instance.get().createJobContext(job, new JobID()))){ + HiveHCatSplitWrapper hwrapper = new HiveHCatSplitWrapper((HCatSplit)hs); + + String hwrapperPath = hwrapper.getPath().toString(); + String mapredInputDir = job.get("mapred.input.dir","null"); + + if(hwrapperPath.startsWith(mapredInputDir)){ + hsplits.add(hwrapper); + } + } + InputSplit[] splits = new InputSplit[hsplits.size()]; + for (int i = 0 ; i jobProperties) throws IOException{ + try { + Pair dbAndTableName = HCatUtil.getDbAndTableName(tableDesc.getTableName()); + InputJobInfo info = InputJobInfo.create(dbAndTableName.first, dbAndTableName.second, "", null, null); + jobProperties.put(HCatConstants.HCAT_KEY_JOB_INFO + ,InitializeInput.getSerializedHcatKeyJobInfo( + null, info,tableDesc.getProperties().getProperty("location"))); + } catch (Exception e){ + throw new IOException(e); + } + } + +} Added: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java?rev=1241354&view=auto ============================================================================== --- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java (added) +++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HCatMapredOutputFormat.java Tue Feb 7 05:50:44 2012 @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.util.Progressable; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatSchema; +import org.apache.hcatalog.data.Pair; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; +import org.apache.hcatalog.data.schema.HCatSchemaUtils.CollectionBuilder; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; +import org.apache.hcatalog.mapreduce.InitializeInput; +import org.apache.hcatalog.mapreduce.InputJobInfo; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.shims.HCatHadoopShims; +import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl; +import org.apache.hadoop.hive.ql.plan.PartitionDesc; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +public class HCatMapredOutputFormat implements OutputFormat, HiveOutputFormat { + + HCatOutputFormat hco; + private static final Log LOG = LogFactory.getLog(HCatMapredOutputFormat.class); + + public HCatMapredOutputFormat() { + LOG.debug("HCatMapredOutputFormat init"); + hco = new HCatOutputFormat(); + } + + @Override + public void checkOutputSpecs(FileSystem arg0, JobConf arg1) + throws IOException { + LOG.debug("HCatMapredOutputFormat checkOutputSpecs"); + JobContext context = HCatHadoopShims.Instance.get().createJobContext(arg1, new JobID()); + try { + hco.checkOutputSpecs(context); + } catch (InterruptedException e) { + LOG.warn(e.getMessage()); + HCatUtil.logStackTrace(LOG); + } + } + + @Override + public RecordWriter getRecordWriter(FileSystem arg0, JobConf arg1, + String arg2, Progressable arg3) throws IOException { + // this is never really called from hive, but it's part of the IF interface + + LOG.debug("HCatMapredOutputFormat getRecordWriter"); + return getRW(arg1); + } + + public HCatMapredRecordWriter getRW(Configuration arg1) throws IOException { + try { + JobContext jc = HCatHadoopShims.Instance.get().createJobContext(arg1, new JobID()); + TaskAttemptContext taContext = HCatHadoopShims.Instance.get().createTaskAttemptContext(arg1, new TaskAttemptID()); + return new HCatMapredOutputFormat.HCatMapredRecordWriter(hco,jc,taContext); + } catch (Exception e){ + throw new IOException(e); + } + } + + @Override + public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter( + JobConf jc, Path finalOutPath, Class valueClass, boolean isCompressed, + Properties tableProperties, Progressable progress) throws IOException { + LOG.debug("HCatMapredOutputFormat getHiveRecordWriter"); + final HCatMapredRecordWriter rw = getRW(jc); + return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() { + public void write(Writable r) throws IOException { + rw.write(null, (HCatRecord) r); + } + public void close(boolean abort) throws IOException { + rw.setAbortStatus(abort); + rw.close(null); + } + }; + + } + + public static void setTableDesc(TableDesc tableDesc, Map jobProperties) throws IOException { + setTableDesc(tableDesc,jobProperties,new LinkedHashMap()); + } + + public static void setPartitionDesc(PartitionDesc ptnDesc, Map jobProperties) throws IOException { + setTableDesc(ptnDesc.getTableDesc(),jobProperties,ptnDesc.getPartSpec()); + } + + public static void setTableDesc(TableDesc tableDesc, Map jobProperties, Map ptnValues) throws IOException { + Pair dbAndTableName = HCatUtil.getDbAndTableName(tableDesc.getTableName()); + + OutputJobInfo outputJobInfo = OutputJobInfo.create( + dbAndTableName.first, dbAndTableName.second, + ptnValues, null, null); + + Job job = new Job(new Configuration()); + // TODO : verify with thw if this needs to be shim-ed. There exists no current Shim + // for instantiating a Job, and we use it only temporarily. + + HCatOutputFormat.setOutput(job, outputJobInfo); + LOG.debug("HCatOutputFormat.setOutput() done"); + + // Now we need to set the schema we intend to write + + Properties tprops = tableDesc.getProperties(); + String columnNameProperty = tprops.getProperty(Constants.LIST_COLUMNS); + String columnTypeProperty = tprops.getProperty(Constants.LIST_COLUMN_TYPES); + + List columnNames; + // all table column names + if (columnNameProperty.length() == 0) { + columnNames = new ArrayList(); + } else { + columnNames = Arrays.asList(columnNameProperty.split(",")); + } + + List columnTypes; + // all column types + if (columnTypeProperty.length() == 0) { + columnTypes = new ArrayList(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); + } + + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); + HCatSchema hsch = HCatSchemaUtils.getHCatSchema(rowTypeInfo).getFields().get(0).getStructSubSchema(); + // getting inner schema, because it's the difference between struct and i:int,j:int. + // and that's what we need to provide to HCatOutputFormat + + LOG.debug("schema "+hsch.toString()); + HCatOutputFormat.setSchema(job, hsch); + + for (String confToSave : HCatConstants.OUTPUT_CONFS_TO_SAVE){ + String confVal = job.getConfiguration().get(confToSave); + if (confVal != null){ + jobProperties.put(confToSave, confVal); + } + } + + } + + public class HCatMapredRecordWriter implements org.apache.hadoop.mapred.RecordWriter, HCatRecord>{ + + org.apache.hadoop.mapreduce.RecordWriter writer; + org.apache.hadoop.mapreduce.OutputCommitter outputCommitter; + TaskAttemptContext taContext; + JobContext jc; + boolean jobIsSetup = false; + boolean wroteData = false; + boolean aborted = false; + + public HCatMapredRecordWriter( + HCatOutputFormat hco, JobContext jc, + TaskAttemptContext taContext) throws IOException{ + this.taContext = taContext; + try { + this.outputCommitter = hco.getOutputCommitter(taContext); + this.writer = hco.getRecordWriter(taContext); + } catch (java.lang.InterruptedException e){ + throw new IOException(e); + } + this.wroteData = false; + this.aborted = false; + } + + public void setAbortStatus(boolean abort) { + this.aborted = abort; + } + + @Override + public void close(Reporter arg0) throws IOException { + try { + writer.close(taContext); + if (outputCommitter.needsTaskCommit(taContext)){ + outputCommitter.commitTask(taContext); + } + if (this.wroteData && this.jobIsSetup){ + if (!this.aborted){ + outputCommitter.commitJob(taContext); + } else { + outputCommitter.cleanupJob(taContext); + } + } + } catch (Exception e){ + throw new IOException(e); + } + } + + @Override + public void write(WritableComparable arg0, HCatRecord arg1) throws IOException { + try { + if (!jobIsSetup){ + this.outputCommitter.setupJob(taContext); + jobIsSetup = true; + } + writer.write(arg0, arg1); + this.wroteData = true; + } catch (Exception e){ + throw new IOException(e); + } + } + + } +} Added: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java?rev=1241354&view=auto ============================================================================== --- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java (added) +++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/mapred/HiveHCatSplitWrapper.java Tue Feb 7 05:50:44 2012 @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.mapreduce.HCatSplit; + +/** + * Even though HiveInputSplit expects an InputSplit to wrap, it + * expects getPath() to work from the underlying split. And since + * that's populated by HiveInputSplit only if the underlying + * split is a FileSplit, the HCatSplit that goes to Hive needs + * to be a FileSplit. And since FileSplit is a class, and + * mapreduce.InputSplit is also a class, we can't do the trick + * where we implement mapred.inputSplit and extend mapred.InputSplit. + * + * Thus, we compose the other HCatSplit, and work with it. + * + * Also, this means that reading HCat through Hive will only work + * when the underlying InputFormat's InputSplit has implemented + * a getPath() - either by subclassing FileSplit, or by itself - + * we make a best effort attempt to call a getPath() via reflection, + * but if that doesn't work, this isn't going to work. + * + */ +public class HiveHCatSplitWrapper extends FileSplit implements InputSplit { + + Log LOG = LogFactory.getLog(HiveHCatSplitWrapper.class); + + HCatSplit hsplit; + + public HiveHCatSplitWrapper() { + super((Path) null, 0, 0, (String[]) null); + } + + public HiveHCatSplitWrapper(HCatSplit hsplit) { + this(); + this.hsplit = hsplit; + } + + @Override + public void readFields(DataInput input) throws IOException { + hsplit = new HCatSplit(); + hsplit.readFields(input); + } + + @Override + public void write(DataOutput output) throws IOException { + hsplit.write(output); + } + + @Override + public long getLength() { + return hsplit.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return hsplit.getLocations(); + } + + @Override + public Path getPath() { + /** + * This function is the reason this class exists at all. + * See class description for why. + */ + if (hsplit.getBaseSplit() instanceof FileSplit){ + // if baseSplit is a FileSplit, then return that. + return ((FileSplit)hsplit.getBaseSplit()).getPath(); + } else { + // use reflection to try and determine if underlying class has a getPath() method that returns a path + Class c = hsplit.getBaseSplit().getClass(); + try { + return (Path) (c.getMethod("getPath")).invoke(hsplit.getBaseSplit()); + } catch (Exception e) { + HCatUtil.logStackTrace(LOG); + // not much we can do - default exit will return null Path + } + + } + LOG.error("Returning empty path from getPath(), Hive will not be happy."); + return new Path(""); // This will cause hive to error, but we can't do anything for that situation. + } + + public HCatSplit getHCatSplit() { + return hsplit; + } + +} Added: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java?rev=1241354&view=auto ============================================================================== --- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java (added) +++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/DummyHCatAuthProvider.java Tue Feb 7 05:50:44 2012 @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.storagehandler; + +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.ql.metadata.AuthorizationException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.security.HiveAuthenticationProvider; +import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.ql.security.authorization.Privilege; + +/** + * This class is a dummy implementation of HiveAuthorizationProvider to provide + * dummy authorization functionality for other classes to extend and override. + */ +class DummyHCatAuthProvider implements HiveAuthorizationProvider { + + @Override + public Configuration getConf() { + return null; + } + + @Override + public void setConf(Configuration conf) { + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider + * #init(org.apache.hadoop.conf.Configuration) + */ + @Override + public void init(Configuration conf) throws HiveException { + } + + @Override + public HiveAuthenticationProvider getAuthenticator() { + return null; + } + + @Override + public void setAuthenticator(HiveAuthenticationProvider authenticator) { + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider + * #authorize(org.apache.hadoop.hive.ql.security.authorization.Privilege[], + * org.apache.hadoop.hive.ql.security.authorization.Privilege[]) + */ + @Override + public void authorize(Privilege[] readRequiredPriv, + Privilege[] writeRequiredPriv) throws HiveException, + AuthorizationException { + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider + * #authorize(org.apache.hadoop.hive.metastore.api.Database, + * org.apache.hadoop.hive.ql.security.authorization.Privilege[], + * org.apache.hadoop.hive.ql.security.authorization.Privilege[]) + */ + @Override + public void authorize(Database db, Privilege[] readRequiredPriv, + Privilege[] writeRequiredPriv) throws HiveException, + AuthorizationException { + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider + * #authorize(org.apache.hadoop.hive.ql.metadata.Table, + * org.apache.hadoop.hive.ql.security.authorization.Privilege[], + * org.apache.hadoop.hive.ql.security.authorization.Privilege[]) + */ + @Override + public void authorize(Table table, Privilege[] readRequiredPriv, + Privilege[] writeRequiredPriv) throws HiveException, + AuthorizationException { + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider + * #authorize(org.apache.hadoop.hive.ql.metadata.Partition, + * org.apache.hadoop.hive.ql.security.authorization.Privilege[], + * org.apache.hadoop.hive.ql.security.authorization.Privilege[]) + */ + @Override + public void authorize(Partition part, Privilege[] readRequiredPriv, + Privilege[] writeRequiredPriv) throws HiveException, + AuthorizationException { + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider + * #authorize(org.apache.hadoop.hive.ql.metadata.Table, + * org.apache.hadoop.hive.ql.metadata.Partition, java.util.List, + * org.apache.hadoop.hive.ql.security.authorization.Privilege[], + * org.apache.hadoop.hive.ql.security.authorization.Privilege[]) + */ + @Override + public void authorize(Table table, Partition part, List columns, + Privilege[] readRequiredPriv, Privilege[] writeRequiredPriv) + throws HiveException, AuthorizationException { + } + +} Modified: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java?rev=1241354&r1=1241353&r2=1241354&view=diff ============================================================================== --- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java (original) +++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandler.java Tue Feb 7 05:50:44 2012 @@ -186,7 +186,7 @@ public abstract class HCatStorageHandler * () */ @Override - public final Class getInputFormatClass() { + public Class getInputFormatClass() { return DummyInputFormat.class; } @@ -198,7 +198,7 @@ public abstract class HCatStorageHandler * () */ @Override - public final Class getOutputFormatClass() { + public Class getOutputFormatClass() { return DummyOutputFormat.class; } Added: incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java?rev=1241354&view=auto ============================================================================== --- incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java (added) +++ incubator/hcatalog/branches/branch-0.3/src/java/org/apache/hcatalog/storagehandler/HCatStorageHandlerImpl.java Tue Feb 7 05:50:44 2012 @@ -0,0 +1,163 @@ +package org.apache.hcatalog.storagehandler; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.logging.Logger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.data.HCatRecordSerDe; +import org.apache.hcatalog.mapred.HCatMapredInputFormat; +import org.apache.hcatalog.mapred.HCatMapredOutputFormat; +import org.apache.hcatalog.mapreduce.HCatInputStorageDriver; +import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver; +import org.apache.hcatalog.mapreduce.OutputJobInfo; +import org.apache.hcatalog.storagehandler.HCatStorageHandler.DummyInputFormat; +import org.apache.hcatalog.storagehandler.HCatStorageHandler.DummyOutputFormat; + +public class HCatStorageHandlerImpl extends HCatStorageHandler { + + Class isd; + Class osd; + + Log LOG = LogFactory.getLog(HCatStorageHandlerImpl.class); + + @Override + public Class getInputStorageDriver() { + return isd; + } + + @Override + public Class getOutputStorageDriver() { + return osd; + } + + @Override + public HiveAuthorizationProvider getAuthorizationProvider() + throws HiveException { + return new DummyHCatAuthProvider(); + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + } + + @Override + public void commitDropTable(Table table, boolean deleteData) + throws MetaException { + // do nothing special + } + + @Override + public void preCreateTable(Table table) throws MetaException { + // do nothing special + } + + @Override + public void preDropTable(Table table) throws MetaException { + // do nothing special + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + // do nothing special + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + // do nothing special + } + + @Override + public HiveMetaHook getMetaHook() { + return this; + } + + @Override + public void configureTableJobProperties(TableDesc tableDesc, + Map jobProperties) { + // Information about the table and the job to be performed + // We pass them on into the mepredif / mapredof + + Properties tprops = tableDesc.getProperties(); + + if(LOG.isDebugEnabled()){ + LOG.debug("HCatStorageHandlerImpl configureTableJobProperties:"); + HCatUtil.logStackTrace(LOG); + HCatUtil.logMap(LOG, "jobProperties", jobProperties); + if (tprops!= null){ + HCatUtil.logEntrySet(LOG, "tableprops", tprops.entrySet()); + } + LOG.debug("tablename : "+tableDesc.getTableName()); + } + + // copy existing table props first + for (Entry e : tprops.entrySet()){ + jobProperties.put((String)e.getKey(), (String)e.getValue()); + } + + // try to set input format related properties + try { + HCatMapredInputFormat.setTableDesc(tableDesc,jobProperties); + } catch (IOException ioe){ + // ok, things are probably not going to work, but we + // can't throw out exceptions per interface. So, we log. + LOG.error("HCatInputFormat init fail " + ioe.getMessage()); + LOG.error(ioe.getStackTrace()); + } + + // try to set output format related properties + try { + HCatMapredOutputFormat.setTableDesc(tableDesc,jobProperties); + } catch (IOException ioe){ + // ok, things are probably not going to work, but we + // can't throw out exceptions per interface. So, we log. + LOG.error("HCatOutputFormat init fail " + ioe.getMessage()); + LOG.error(ioe.getStackTrace()); + } + } + + @Override + public Configuration getConf() { + return null; + } + + @Override + public void setConf(Configuration conf) { + } + + @Override + public Class getSerDeClass() { + return HCatRecordSerDe.class; + } + + @Override + public final Class getInputFormatClass() { + return HCatMapredInputFormat.class; + } + + @Override + public final Class getOutputFormatClass() { + return HCatMapredOutputFormat.class; + } + +} Added: incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java?rev=1241354&view=auto ============================================================================== --- incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java (added) +++ incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/data/HCatDataCheckUtil.java Tue Feb 7 05:50:44 2012 @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.data; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hcatalog.MiniCluster; +import org.apache.hcatalog.common.HCatUtil; +import org.apache.hcatalog.mapreduce.HCatOutputFormat; + +/** + * Helper class for Other Data Testers + */ +public class HCatDataCheckUtil { + + public static Driver instantiateDriver(MiniCluster cluster) { + HiveConf hiveConf = new HiveConf(HCatDataCheckUtil.class); + for (Entry e : cluster.getProperties().entrySet()){ + hiveConf.set(e.getKey().toString(), e.getValue().toString()); + } + hiveConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hiveConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + + Log logger = LogFactory.getLog(HCatOutputFormat.class); + HCatUtil.logHiveConf(logger , hiveConf); + + Driver driver = new Driver(hiveConf); + SessionState.start(new CliSessionState(hiveConf)); + return driver; + } + + public static void generateDataFile(MiniCluster cluster, String fileName) throws IOException { + MiniCluster.deleteFile(cluster, fileName); + String[] input = new String[50]; + for(int i = 0; i < 50; i++) { + input[i] = (i % 5) + "\t" + i + "\t" + "_S" + i + "S_"; + } + MiniCluster.createInputFile(cluster, fileName, input); + } + + public static void createTable(Driver driver, String tableName, String createTableArgs) + throws CommandNeedRetryException, IOException { + String createTable = "create table " + tableName + createTableArgs; + int retCode = driver.run(createTable).getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table. ["+createTable+"], return code from hive driver : ["+retCode+"]"); + } + } + + public static void dropTable(Driver driver, String tablename) throws IOException, CommandNeedRetryException{ + driver.run("drop table if exists "+tablename); + } + + public static ArrayList formattedRun(Driver driver, String name, String selectCmd) + throws CommandNeedRetryException, IOException { + driver.run(selectCmd); + ArrayList src_values = new ArrayList(); + driver.getResults(src_values); + for (String s : src_values){ + System.out.println(name+":"+s); + } + return src_values; + } + +} Added: incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java?rev=1241354&view=auto ============================================================================== --- incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java (added) +++ incubator/hcatalog/branches/branch-0.3/src/test/org/apache/hcatalog/mapred/TestHiveHCatInputFormat.java Tue Feb 7 05:50:44 2012 @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hcatalog.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import junit.framework.TestCase; + +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.ql.Driver; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hcatalog.MiniCluster; +import org.apache.hcatalog.data.HCatDataCheckUtil; +import org.apache.hcatalog.mapred.HCatMapredInputFormat; +import org.apache.hcatalog.mapreduce.HCatInputFormat; +import org.apache.hcatalog.storagehandler.HCatStorageHandlerImpl; +import org.apache.pig.ExecType; +import org.apache.pig.PigServer; +import org.apache.pig.impl.util.UDFContext; + +public class TestHiveHCatInputFormat extends TestCase { + private static MiniCluster cluster = MiniCluster.buildCluster(); + private static Driver driver; + + String PTNED_TABLE = "junit_testhiveinputintegration_ptni"; + String UNPTNED_TABLE = "junit_testhiveinputintegration_noptn"; + String basicFile = "/tmp/"+PTNED_TABLE+".file"; + + public void testFromHive() throws Exception { + if (driver == null){ + driver = HCatDataCheckUtil.instantiateDriver(cluster); + } + + Properties props = new Properties(); + props.setProperty("fs.default.name", cluster.getProperties().getProperty("fs.default.name")); + String basicFileFullName = cluster.getProperties().getProperty("fs.default.name") + basicFile; + + cleanup(); + + // create source data file + HCatDataCheckUtil.generateDataFile(cluster,basicFile); + + String createPtnedTable = "(j int, s string) partitioned by (i int) " + +"stored by '"+HCatStorageHandlerImpl.class.getName()+"' tblproperties" + + "('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') "; + + HCatDataCheckUtil.createTable(driver,PTNED_TABLE,createPtnedTable); + + String createUnptnedTable = "(i int, j int, s string) " + +"stored by '"+HCatStorageHandlerImpl.class.getName()+"' tblproperties" + + "('hcat.isd'='org.apache.hcatalog.rcfile.RCFileInputDriver'," + + "'hcat.osd'='org.apache.hcatalog.rcfile.RCFileOutputDriver') "; + + HCatDataCheckUtil.createTable(driver,UNPTNED_TABLE,createUnptnedTable); + + + driver.run("describe extended "+UNPTNED_TABLE); + ArrayList des_values = new ArrayList(); + driver.getResults(des_values); + for (String s : des_values){ + System.err.println("du:"+s); + } + + driver.run("describe extended "+PTNED_TABLE); + ArrayList des2_values = new ArrayList(); + driver.getResults(des2_values); + for (String s : des2_values){ + System.err.println("dp:"+s); + } + + // use pig to read from source file and put into this table + + PigServer server = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + server.setBatchOn(); + server.registerQuery("A = load '"+basicFileFullName+"' as (i:int, j:int, s:chararray);"); + server.registerQuery("store A into '"+UNPTNED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();"); + server.executeBatch(); + + server.setBatchOn(); + server.registerQuery("A = load '"+basicFileFullName+"' as (i:int, j:int, s:chararray);"); + server.registerQuery("store A into '"+PTNED_TABLE+"' using org.apache.hcatalog.pig.HCatStorer();"); + server.executeBatch(); + + // partitioned by i + // select * from tbl; + // select j,s,i from tbl; + // select * from tbl where i = 3; + // select j,s,i from tbl where i = 3; + // select * from tbl where j = 3; + // select j,s,i from tbl where j = 3; + + ArrayList p_select_star_nofilter = HCatDataCheckUtil.formattedRun(driver, + "p_select_star_nofilter","select * from "+PTNED_TABLE); + ArrayList p_select_named_nofilter = HCatDataCheckUtil.formattedRun(driver, + "p_select_named_nofilter","select j,s,i from "+PTNED_TABLE); + + assertDataIdentical(p_select_star_nofilter,p_select_named_nofilter,50); + + ArrayList p_select_star_ptnfilter = HCatDataCheckUtil.formattedRun(driver, + "p_select_star_ptnfilter","select * from "+PTNED_TABLE+" where i = 3"); + ArrayList p_select_named_ptnfilter = HCatDataCheckUtil.formattedRun(driver, + "p_select_named_ptnfilter","select j,s,i from "+PTNED_TABLE+" where i = 3"); + + assertDataIdentical(p_select_star_ptnfilter,p_select_named_ptnfilter,10); + + ArrayList select_star_nonptnfilter = HCatDataCheckUtil.formattedRun(driver, + "select_star_nonptnfilter","select * from "+PTNED_TABLE+" where j = 28"); + ArrayList select_named_nonptnfilter = HCatDataCheckUtil.formattedRun(driver, + "select_named_nonptnfilter","select j,s,i from "+PTNED_TABLE+" where j = 28"); + + assertDataIdentical(select_star_nonptnfilter,select_named_nonptnfilter,1); + + // non-partitioned + // select * from tbl; + // select i,j,s from tbl; + // select * from tbl where i = 3; + // select i,j,s from tbl where i = 3; + + // select j,s,i from tbl; + // select j,s,i from tbl where i = 3; + + ArrayList select_star_nofilter = HCatDataCheckUtil.formattedRun(driver, + "select_star_nofilter","select * from "+UNPTNED_TABLE); //i,j,s select * order is diff for unptn + ArrayList select_ijs_nofilter = HCatDataCheckUtil.formattedRun(driver, + "select_ijs_nofilter","select i,j,s from "+UNPTNED_TABLE); + + assertDataIdentical(select_star_nofilter,select_ijs_nofilter,50); + + ArrayList select_star_ptnfilter = HCatDataCheckUtil.formattedRun(driver, + "select_star_ptnfilter","select * from "+UNPTNED_TABLE+" where i = 3"); //i,j,s + ArrayList select_ijs_ptnfilter = HCatDataCheckUtil.formattedRun(driver, + "select_ijs_ptnfilter","select i,j,s from "+UNPTNED_TABLE+" where i = 3"); + + assertDataIdentical(select_star_ptnfilter,select_ijs_ptnfilter,10); + + ArrayList select_jsi_nofilter = HCatDataCheckUtil.formattedRun(driver, + "select_jsi_nofilter","select j,s,i from "+UNPTNED_TABLE); + assertDataIdentical(p_select_named_nofilter,select_jsi_nofilter,50,true); + + ArrayList select_jsi_ptnfilter = HCatDataCheckUtil.formattedRun(driver, + "select_jsi_ptnfilter","select j,s,i from "+UNPTNED_TABLE+" where i = 3"); + assertDataIdentical(p_select_named_ptnfilter,select_jsi_ptnfilter,10,true); + + } + + private void assertDataIdentical(ArrayList result1, + ArrayList result2, int numRecords) { + assertDataIdentical(result1,result2,numRecords,false); + } + + private void assertDataIdentical(ArrayList result1, + ArrayList result2, int numRecords,boolean doSort) { + assertEquals(numRecords, result1.size()); + assertEquals(numRecords, result2.size()); + Collections.sort(result1); + Collections.sort(result2); + for (int i = 0; i < numRecords; i++){ + assertEquals(result1.get(i),result2.get(i)); + } + } + + + private void cleanup() throws IOException, CommandNeedRetryException { + MiniCluster.deleteFile(cluster, basicFile); + HCatDataCheckUtil.dropTable(driver,PTNED_TABLE); + HCatDataCheckUtil.dropTable(driver,UNPTNED_TABLE); + } + +}