hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1555255 - in /hive/branches/tez: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/header_footer_table_1/ data/files/header_footer_table_2/ itests/qtest/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/had...
Date Fri, 03 Jan 2014 21:09:56 GMT
Author: gunther
Date: Fri Jan  3 21:09:55 2014
New Revision: 1555255

URL: http://svn.apache.org/r1555255
Log:
Merge latest trunk into branch. (Gunther Hagleitner)

Added:
    hive/branches/tez/data/files/header_footer_table_1/
      - copied from r1555253, hive/trunk/data/files/header_footer_table_1/
    hive/branches/tez/data/files/header_footer_table_2/
      - copied from r1555253, hive/trunk/data/files/header_footer_table_2/
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java
      - copied unchanged from r1555253, hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java
    hive/branches/tez/ql/src/test/queries/clientnegative/file_with_header_footer_negative.q
      - copied unchanged from r1555253, hive/trunk/ql/src/test/queries/clientnegative/file_with_header_footer_negative.q
    hive/branches/tez/ql/src/test/queries/clientpositive/file_with_header_footer.q
      - copied unchanged from r1555253, hive/trunk/ql/src/test/queries/clientpositive/file_with_header_footer.q
    hive/branches/tez/ql/src/test/results/clientnegative/file_with_header_footer_negative.q.out
      - copied unchanged from r1555253, hive/trunk/ql/src/test/results/clientnegative/file_with_header_footer_negative.q.out
    hive/branches/tez/ql/src/test/results/clientpositive/file_with_header_footer.q.out
      - copied unchanged from r1555253, hive/trunk/ql/src/test/results/clientpositive/file_with_header_footer.q.out
Modified:
    hive/branches/tez/   (props changed)
    hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/tez/conf/hive-default.xml.template
    hive/branches/tez/itests/qtest/pom.xml
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
    hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
    hive/branches/tez/serde/if/serde.thrift
    hive/branches/tez/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java

Propchange: hive/branches/tez/
------------------------------------------------------------------------------
  Merged /hive/trunk:r1555193-1555253

Modified: hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/tez/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Jan  3
21:09:55 2014
@@ -223,6 +223,9 @@ public class HiveConf extends Configurat
     // ignore the mapjoin hint
     HIVEIGNOREMAPJOINHINT("hive.ignore.mapjoin.hint", true),
 
+    // Max number of lines of footer user can set for a table file.
+    HIVE_FILE_MAX_FOOTER("hive.file.max.footer", 100),
+
     // Hadoop Configuration Properties
     // Properties with null values are ignored and exist only for the purpose of giving us
     // a symbolic name to reference in the Hive source code. Properties with non-null

Modified: hive/branches/tez/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/branches/tez/conf/hive-default.xml.template?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/conf/hive-default.xml.template (original)
+++ hive/branches/tez/conf/hive-default.xml.template Fri Jan  3 21:09:55 2014
@@ -386,6 +386,12 @@
 </property>
 
 <property>
+  <name>hive.file.max.footer</name>
+  <value>100</value>
+  <description>maximum number of lines for footer user can define for a table file</description>
+</property>
+
+<property>
   <name>hive.map.aggr</name>
   <value>true</value>
   <description>Whether to use map-side aggregation in Hive Group By queries</description>

Modified: hive/branches/tez/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/itests/qtest/pom.xml?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/itests/qtest/pom.xml (original)
+++ hive/branches/tez/itests/qtest/pom.xml Fri Jan  3 21:09:55 2014
@@ -36,8 +36,8 @@
     <run_disabled>false</run_disabled>
     <clustermode></clustermode>
     <execute.beeline.tests>false</execute.beeline.tests>
-    <minimr.query.files>stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q</minimr.query.files>
-    <minimr.query.negative.files>cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q</minimr.query.negative.files>
+    <minimr.query.files>stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q</minimr.query.files>
+    <minimr.query.negative.files>cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q</minimr.query.negative.files>
     <minitez.query.files>tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q</minitez.query.files>
     <minitez.query.files.shared>join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q</minitez.query.files.shared>
     <beeline.positive.exclude>add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rena
 me.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_o
 verwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q</beeline.positive.exclude>

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Fri Jan
 3 21:09:55 2014
@@ -34,8 +34,10 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.FooterBuffer;
 import org.apache.hadoop.hive.ql.io.HiveContextAwareRecordReader;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveRecordReader;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hive.ql.plan.Fe
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.DelegatedObjectInspectorFactory;
@@ -80,6 +83,9 @@ public class FetchOperator implements Se
   private PartitionDesc currPart;
   private TableDesc currTbl;
   private boolean tblDataDone;
+  private FooterBuffer footerBuffer = null;
+  private int headerCount = 0;
+  private int footerCount = 0;
 
   private boolean hasVC;
   private boolean isPartitioned;
@@ -527,6 +533,7 @@ public class FetchOperator implements Se
   public InspectableObject getNextRow() throws IOException {
     try {
       while (true) {
+        boolean opNotEOF = true;
         if (context != null) {
           context.resetRow();
         }
@@ -535,10 +542,49 @@ public class FetchOperator implements Se
           if (currRecReader == null) {
             return null;
           }
+
+          /**
+           * Start reading a new file.
+           * If file contains header, skip header lines before reading the records.
+           * If file contains footer, used FooterBuffer to cache and remove footer
+           * records at the end of the file.
+           */
+          headerCount = 0;
+          footerCount = 0;
+          TableDesc table = null;
+          if (currTbl != null) {
+            table = currTbl;
+          } else if (currPart != null) {
+            table = currPart.getTableDesc();
+          }
+          if (table != null) {
+            headerCount = Utilities.getHeaderCount(table);
+            footerCount = Utilities.getFooterCount(table, job);
+          }
+
+          // Skip header lines.
+          opNotEOF = Utilities.skipHeader(currRecReader, headerCount, key, value);
+
+          // Initialize footer buffer.
+          if (opNotEOF) {
+            if (footerCount > 0) {
+              footerBuffer = new FooterBuffer();
+              opNotEOF = footerBuffer.initializeBuffer(job, currRecReader, footerCount, key,
value);
+            }
+          }
         }
 
-        boolean ret = currRecReader.next(key, value);
-        if (ret) {
+        if (opNotEOF && footerBuffer == null) {
+          /**
+           * When file doesn't end after skipping header line
+           * and there is no footer lines, read normally.
+           */
+          opNotEOF = currRecReader.next(key, value);
+        }
+        if (opNotEOF && footerBuffer != null) {
+          opNotEOF = footerBuffer.updateBuffer(job, currRecReader, key, value);
+        }
+        if (opNotEOF) {
           if (operator != null && context != null && context.inputFileChanged())
{
             // The child operators cleanup if input file has changed
             try {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Jan  3
21:09:55 2014
@@ -95,6 +95,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.HiveInterruptCallback;
 import org.apache.hadoop.hive.common.HiveInterruptUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -164,12 +165,14 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -3208,4 +3211,73 @@ public final class Utilities {
     + baseDir + " Giving up after " + MAX_ATTEMPS + " attemps");
 
   }
+
+  /**
+   * Skip header lines in the table file when reading the record.
+   *
+   * @param currRecReader
+   *          Record reader.
+   *
+   * @param headerCount
+   *          Header line number of the table files.
+   *
+   * @param key
+   *          Key of current reading record.
+   *
+   * @param value
+   *          Value of current reading record.
+   *
+   * @return Return true if there are 0 or more records left in the file
+   *         after skipping all headers, otherwise return false.
+   */
+  public static boolean skipHeader(RecordReader<WritableComparable, Writable> currRecReader,
+      int headerCount, WritableComparable key, Writable value) throws IOException {
+    while (headerCount > 0) {
+      if (!currRecReader.next(key, value))
+        return false;
+      headerCount--;
+    }
+    return true;
+  }
+
+  /**
+   * Get header line count for a table.
+   *
+   * @param table
+   *          Table description for target table.
+   *
+   */
+  public static int getHeaderCount(TableDesc table) throws IOException {
+    int headerCount;
+    try {
+      headerCount = Integer.parseInt(table.getProperties().getProperty(serdeConstants.HEADER_COUNT,
"0"));
+    } catch (NumberFormatException nfe) {
+      throw new IOException(nfe);
+    }
+    return headerCount;
+  }
+
+  /**
+   * Get footer line count for a table.
+   *
+   * @param table
+   *          Table description for target table.
+   *
+   * @param job
+   *          Job configuration for current job.
+   */
+  public static int getFooterCount(TableDesc table, JobConf job) throws IOException {
+    int footerCount;
+    try {
+      footerCount = Integer.parseInt(table.getProperties().getProperty(serdeConstants.FOOTER_COUNT,
"0"));
+      if (footerCount > HiveConf.getIntVar(job, HiveConf.ConfVars.HIVE_FILE_MAX_FOOTER))
{
+        throw new IOException("footer number exceeds the limit defined in hive.file.max.footer");
+      }
+    } catch (NumberFormatException nfe) {
+
+      // Footer line number must be set as an integer.
+      throw new IOException(nfe);
+    }
+    return footerCount;
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
Fri Jan  3 21:09:55 2014
@@ -20,24 +20,38 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.FooterBuffer;
 import org.apache.hadoop.hive.ql.io.IOContext.Comparison;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /** This class prepares an IOContext, and provides the ability to perform a binary search
on the
   * data.  The binary search can be used by setting the value of inputFormatSorted in the
@@ -217,6 +231,10 @@ public abstract class HiveContextAwareRe
     }
   }
 
+  private FooterBuffer footerBuffer = null;
+  private int headerCount = 0;
+  private int footerCount = 0;
+
   public boolean doNext(K key, V value) throws IOException {
     if (this.isSorted) {
       if (this.getIOContext().shouldEndBinarySearch() ||
@@ -271,7 +289,57 @@ public abstract class HiveContextAwareRe
     }
 
     try {
-      return recordReader.next(key,  value);
+
+      /**
+       * When start reading new file, check header, footer rows.
+       * If file contains header, skip header lines before reading the records.
+       * If file contains footer, used a FooterBuffer to remove footer lines
+       * at the end of the table file.
+       **/
+      if (this.ioCxtRef.getCurrentBlockStart() == 0) {
+
+        // Check if the table file has header to skip.
+        Path filePath = this.ioCxtRef.getInputPath();
+        PartitionDesc part = null;
+        try {
+          Map<String, PartitionDesc> pathToPartitionInfo = Utilities
+              .getMapWork(jobConf).getPathToPartitionInfo();
+          part = HiveFileFormatUtils
+              .getPartitionDescFromPathRecursively(pathToPartitionInfo,
+                  filePath, IOPrepareCache.get().getPartitionDescMap());
+        } catch (AssertionError ae) {
+          LOG.info("Cannot get partition description from " + this.ioCxtRef.getInputPath()
+              + "because " + ae.getMessage());
+          part = null;
+        } catch (Exception e) {
+          LOG.info("Cannot get partition description from " + this.ioCxtRef.getInputPath()
+              + "because " + e.getMessage());
+          part = null;
+        }
+        TableDesc table = (part == null) ? null : part.getTableDesc();
+        if (table != null) {
+          headerCount = Utilities.getHeaderCount(table);
+          footerCount = Utilities.getFooterCount(table, jobConf);
+        }
+
+        // If input contains header, skip header.
+        if (!Utilities.skipHeader(recordReader, headerCount, (WritableComparable)key, (Writable)value))
{
+          return false;
+        }
+        if (footerCount > 0) {
+          footerBuffer = new FooterBuffer();
+          if (!footerBuffer.initializeBuffer(jobConf, recordReader, footerCount, (WritableComparable)key,
(Writable)value)) {
+            return false;
+          }
+        }
+      }
+      if (footerBuffer == null) {
+
+        // Table files don't have footer rows.
+        return recordReader.next(key,  value);
+      } else {
+        return footerBuffer.updateBuffer(jobConf, recordReader, (WritableComparable)key,
(Writable)value);
+      }
     } catch (Exception e) {
       return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e, jobConf);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Fri Jan
 3 21:09:55 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configurab
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hive.ql.plan.Op
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.Writable;
@@ -266,6 +268,19 @@ public class HiveInputFormat<K extends W
 
     FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()]));
     conf.setInputFormat(inputFormat.getClass());
+
+    int headerCount = 0;
+    int footerCount = 0;
+    if (table != null) {
+      headerCount = Utilities.getHeaderCount(table);
+      footerCount = Utilities.getFooterCount(table, conf);
+      if (headerCount != 0 || footerCount != 0) {
+        
+        // Input file has header or footer, cannot be splitted.
+        conf.setLong("mapred.min.split.size", Long.MAX_VALUE);
+      }
+    }
+
     InputSplit[] iss = inputFormat.getSplits(conf, splits);
     for (InputSplit is : iss) {
       result.add(new HiveInputSplit(is, inputFormatClass.getName()));

Modified: hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
(original)
+++ hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
Fri Jan  3 21:09:55 2014
@@ -24,13 +24,19 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.LinkedHashMap;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat.HiveInputSplit;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan;
@@ -127,6 +133,15 @@ public class TestHiveBinarySearchRecordR
     when(rcfReader.getPos()).thenReturn(50L);
     conf = new JobConf();
     conf.setBoolean("hive.input.format.sorted", true);
+
+    TableDesc tblDesc = Utilities.defaultTd;
+    PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
+    LinkedHashMap<String, PartitionDesc> pt = new LinkedHashMap<String, PartitionDesc>();
+    pt.put("/tmp/testfolder", partDesc);
+    MapredWork mrwork = new MapredWork();
+    mrwork.getMapWork().setPathToPartitionInfo(pt);
+    Utilities.setMapRedWork(conf, mrwork,"/tmp/" + System.getProperty("user.name") + "/hive");
+
     hiveSplit = new TestHiveInputSplit();
     hbsReader = new TestHiveRecordReader(rcfReader, conf);
     hbsReader.initIOContext(hiveSplit, conf, Class.class, rcfReader);

Modified: hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
(original)
+++ hive/branches/tez/ql/src/test/org/apache/hadoop/hive/ql/io/TestSymlinkTextInputFormat.java
Fri Jan  3 21:09:55 2014
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 
 import junit.framework.TestCase;
@@ -38,6 +39,9 @@ import org.apache.hadoop.hive.ql.QueryPl
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -70,6 +74,15 @@ public class TestSymlinkTextInputFormat 
   protected void setUp() throws IOException {
     conf = new Configuration();
     job = new JobConf(conf);
+
+    TableDesc tblDesc = Utilities.defaultTd;
+    PartitionDesc partDesc = new PartitionDesc(tblDesc, null);
+    LinkedHashMap<String, PartitionDesc> pt = new LinkedHashMap<String, PartitionDesc>();
+    pt.put("/tmp/testfolder", partDesc);
+    MapredWork mrwork = new MapredWork();
+    mrwork.getMapWork().setPathToPartitionInfo(pt);
+    Utilities.setMapRedWork(job, mrwork,"/tmp/" + System.getProperty("user.name") + "/hive");
+
     fileSystem = FileSystem.getLocal(conf);
     testDir = new Path(System.getProperty("test.tmp.dir", System.getProperty(
         "user.dir", new File(".").getAbsolutePath()))

Modified: hive/branches/tez/serde/if/serde.thrift
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/if/serde.thrift?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/serde/if/serde.thrift (original)
+++ hive/branches/tez/serde/if/serde.thrift Fri Jan  3 21:09:55 2014
@@ -37,6 +37,8 @@ const string LINE_DELIM = "line.delim"
 const string MAPKEY_DELIM = "mapkey.delim"
 const string QUOTE_CHAR = "quote.delim"
 const string ESCAPE_CHAR = "escape.delim"
+const string HEADER_COUNT = "skip.header.line.count"
+const string FOOTER_COUNT = "skip.footer.line.count"
 
 typedef string PrimitiveType
 typedef string CollectionType

Modified: hive/branches/tez/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java?rev=1555255&r1=1555254&r2=1555255&view=diff
==============================================================================
--- hive/branches/tez/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java
(original)
+++ hive/branches/tez/serde/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/serde/serdeConstants.java
Fri Jan  3 21:09:55 2014
@@ -61,6 +61,10 @@ public class serdeConstants {
 
   public static final String ESCAPE_CHAR = "escape.delim";
 
+  public static final String HEADER_COUNT = "skip.header.line.count";
+
+  public static final String FOOTER_COUNT = "skip.footer.line.count";
+
   public static final String VOID_TYPE_NAME = "void";
 
   public static final String BOOLEAN_TYPE_NAME = "boolean";



Mime
View raw message