hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vik...@apache.org
Subject svn commit: r1580992 [1/5] - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ itests/qtest/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ ql/...
Date Mon, 24 Mar 2014 18:50:53 GMT
Author: vikram
Date: Mon Mar 24 18:50:51 2014
New Revision: 1580992

URL: http://svn.apache.org/r1580992
Log:
HIVE-6455 : Scalable dynamic partitioning and bucketing optimization (Prasanth J via Vikram Dixit)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
    hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q
    hive/trunk/ql/src/test/results/clientpositive/dynpart_sort_optimization.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/dynpart_sort_optimization.q.out
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/itests/qtest/pom.xml
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
    hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition.q
    hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition2.q
    hive/trunk/ql/src/test/results/clientnegative/dyn_part_max_per_node.q.out
    hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_16.q.out
    hive/trunk/ql/src/test/results/clientpositive/infer_bucket_sort_dyn_part.q.out
    hive/trunk/ql/src/test/results/clientpositive/insert_into6.q.out
    hive/trunk/ql/src/test/results/clientpositive/load_dyn_part1.q.out
    hive/trunk/ql/src/test/results/clientpositive/load_dyn_part10.q.out
    hive/trunk/ql/src/test/results/clientpositive/load_dyn_part14.q.out
    hive/trunk/ql/src/test/results/clientpositive/load_dyn_part3.q.out
    hive/trunk/ql/src/test/results/clientpositive/load_dyn_part4.q.out
    hive/trunk/ql/src/test/results/clientpositive/load_dyn_part5.q.out
    hive/trunk/ql/src/test/results/clientpositive/load_dyn_part8.q.out
    hive/trunk/ql/src/test/results/clientpositive/load_dyn_part9.q.out
    hive/trunk/ql/src/test/results/clientpositive/merge3.q.out
    hive/trunk/ql/src/test/results/clientpositive/merge4.q.out
    hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition3.q.out
    hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition4.q.out
    hive/trunk/ql/src/test/results/clientpositive/merge_dynamic_partition5.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_analyze.q.out
    hive/trunk/ql/src/test/results/clientpositive/orc_create.q.out
    hive/trunk/ql/src/test/results/clientpositive/sample10.q.out
    hive/trunk/ql/src/test/results/clientpositive/stats2.q.out
    hive/trunk/ql/src/test/results/clientpositive/stats4.q.out
    hive/trunk/ql/src/test/results/clientpositive/stats_empty_dyn_part.q.out
    hive/trunk/ql/src/test/results/clientpositive/tez/orc_analyze.q.out
    hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/cast1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input8.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_part1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_testxpath.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_testxpath2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf_case.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf_when.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/union.q.xml

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Mon Mar 24 18:50:51 2014
@@ -609,6 +609,10 @@ public class HiveConf extends Configurat
     HIVEOPTSORTMERGEBUCKETMAPJOIN("hive.optimize.bucketmapjoin.sortedmerge", false), // try to use sorted merge bucket map join
     HIVEOPTREDUCEDEDUPLICATION("hive.optimize.reducededuplication", true),
     HIVEOPTREDUCEDEDUPLICATIONMINREDUCER("hive.optimize.reducededuplication.min.reducer", 4),
+    // when enabled dynamic partitioning column will be globally sorted.
+    // this way we can keep only one record writer open for each partition value
+    // in the reducer thereby reducing the memory pressure on reducers
+    HIVEOPTSORTDYNAMICPARTITION("hive.optimize.sort.dynamic.partition", true),
 
     HIVESAMPLINGFORORDERBY("hive.optimize.sampling.orderby", false),
     HIVESAMPLINGNUMBERFORORDERBY("hive.optimize.sampling.orderby.number", 1000),

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Mon Mar 24 18:50:51 2014
@@ -503,6 +503,14 @@
 </property>
 
 <property>
+  <name>hive.optimize.sort.dynamic.partition</name>
+  <value>true</value>
+  <description>When enabled dynamic partitioning column will be globally sorted.
+  This way we can keep only one record writer open for each partition value
+  in the reducer thereby reducing the memory pressure on reducers.</description>
+</property>
+
+<property>
   <name>hive.optimize.skewjoin.compiletime</name>
   <value>false</value>
   <description>Whether to create a separate plan for skewed keys for the tables in the join.

Modified: hive/trunk/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/trunk/itests/qtest/pom.xml?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/itests/qtest/pom.xml (original)
+++ hive/trunk/itests/qtest/pom.xml Mon Mar 24 18:50:51 2014
@@ -39,7 +39,7 @@
     <minimr.query.files>stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q</minimr.query.files>
     <minimr.query.negative.files>cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q</minimr.query.negative.files>
     <minitez.query.files>mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q</minitez.query.files>
-    <minitez.query.files.shared>orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q</minitez.query.files.shared>
+    <minitez.query.files.shared>dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q</minitez.query.files.shared>
     <beeline.positive.exclude>add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rena
 me.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_o
 verwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q</beeline.positive.exclude>
   </properties>
 

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Mon Mar 24 18:50:51 2014
@@ -33,25 +33,25 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.FSRecordWriter.StatsProvidingRecordWriter;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePartitioner;
 import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc.DPSortState;
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.ql.stats.CounterStatsPublisher;
 import org.apache.hadoop.hive.ql.stats.StatsCollectionTaskIndependent;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -68,6 +68,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import com.google.common.collect.Lists;
+
 /**
  * File Sink operator implementation.
  **/
@@ -93,6 +95,12 @@ public class FileSinkOperator extends Te
   protected transient boolean statsCollectRawDataSize;
   private transient boolean[] statsFromRecordWriter;
   private transient boolean isCollectRWStats;
+  private transient FSPaths prevFsp;
+  private transient FSPaths fpaths;
+  private transient ObjectInspector keyOI;
+  private transient List<Object> keyWritables;
+  private transient List<String> keys;
+  private transient int numKeyColToRead;
 
   /**
    * RecordWriter.
@@ -318,6 +326,20 @@ public class FileSinkOperator extends Te
         lbSetup();
       }
 
+      int numPart = 0;
+      int numBuck = 0;
+      if (conf.getPartitionCols() != null && !conf.getPartitionCols().isEmpty()) {
+        numPart = conf.getPartitionCols().size();
+      }
+
+      // bucket number will exists only in PARTITION_BUCKET_SORTED mode
+      if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
+        numBuck = 1;
+      }
+      numKeyColToRead = numPart + numBuck;
+      keys = Lists.newArrayListWithCapacity(numKeyColToRead);
+      keyWritables = Lists.newArrayListWithCapacity(numKeyColToRead);
+
       if (!bDynParts) {
         fsp = new FSPaths(specPath);
 
@@ -423,56 +445,7 @@ public class FileSinkOperator extends Te
           bucketMap.put(bucketNum, filesIdx);
           taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
         }
-        if (isNativeTable) {
-          fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null);
-          LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
-          fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId);
-          LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
-        } else {
-          fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath;
-        }
-        try {
-          // The reason to keep these instead of using
-          // OutputFormat.getRecordWriter() is that
-          // getRecordWriter does not give us enough control over the file name that
-          // we create.
-          String extension = Utilities.getFileExtension(jc, isCompressed,
-              hiveOutputFormat);
-          if (!bDynParts && !this.isSkewedStoredAsSubDirectories) {
-            fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension);
-          } else {
-            fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension);
-          }
-
-        } catch (Exception e) {
-          e.printStackTrace();
-          throw new HiveException(e);
-        }
-        LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
-
-        if (isNativeTable) {
-          try {
-            // in recent hadoop versions, use deleteOnExit to clean tmp files.
-            autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]);
-          } catch (IOException e) {
-            throw new HiveException(e);
-          }
-        }
-
-        Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
-        // only create bucket files only if no dynamic partitions,
-        // buckets of dynamic partitions will be created for each newly created partition
-        fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
-            jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx],
-            reporter);
-        // If the record writer provides stats, get it from there instead of the serde
-        statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter;
-        // increment the CREATED_FILES counter
-        if (reporter != null) {
-          reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP),
-                               Operator.HIVECOUNTERCREATEDFILES,
-                               1);
-        }
+        createBucketForFileIdx(fsp, filesIdx);
         filesIdx++;
       }
       assert filesIdx == numFiles;
@@ -481,8 +454,6 @@ public class FileSinkOperator extends Te
       if (isNativeTable) {
         autoDelete = fs.deleteOnExit(fsp.outPaths[0]);
       }
-    } catch (HiveException e) {
-      throw e;
     } catch (Exception e) {
       e.printStackTrace();
       throw new HiveException(e);
@@ -491,6 +462,52 @@ public class FileSinkOperator extends Te
     filesCreated = true;
   }
 
+  protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException {
+    try {
+      if (isNativeTable) {
+        fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null);
+        LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
+        fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId);
+        LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
+      } else {
+        fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath;
+      }
+      // The reason to keep these instead of using
+      // OutputFormat.getRecordWriter() is that
+      // getRecordWriter does not give us enough control over the file name that
+      // we create.
+      String extension = Utilities.getFileExtension(jc, isCompressed, hiveOutputFormat);
+      if (!bDynParts && !this.isSkewedStoredAsSubDirectories) {
+        fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, parent, extension);
+      } else {
+        fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension);
+      }
+
+      LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
+
+      if (isNativeTable) {
+        // in recent hadoop versions, use deleteOnExit to clean tmp files.
+        autoDelete = fs.deleteOnExit(fsp.outPaths[filesIdx]);
+      }
+
+      Utilities.copyTableJobPropertiesToConf(conf.getTableInfo(), jc);
+      // only create bucket files only if no dynamic partitions,
+      // buckets of dynamic partitions will be created for each newly created partition
+      fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(jc, conf.getTableInfo(),
+          outputClass, conf, fsp.outPaths[filesIdx], reporter);
+      // If the record writer provides stats, get it from there instead of the serde
+      statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter;
+      // increment the CREATED_FILES counter
+      if (reporter != null) {
+        reporter.incrCounter(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVECOUNTERGROUP),
+            Operator.HIVECOUNTERCREATEDFILES, 1);
+      }
+
+    } catch (IOException e) {
+      throw new HiveException(e);
+    }
+  }
+
   /**
    * Report status to JT so that JT won't kill this task if closing takes too long
    * due to too many files to close and the NN is overloaded.
@@ -516,8 +533,6 @@ public class FileSinkOperator extends Te
     String lbDirName = null;
     lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
 
-    FSPaths fpaths;
-
     if (!bDynParts && !filesCreated) {
       if (lbDirName != null) {
         FSPaths fsp2 = lookupListBucketingPaths(lbDirName);
@@ -549,7 +564,13 @@ public class FileSinkOperator extends Te
         }
         // use SubStructObjectInspector to serialize the non-partitioning columns in the input row
         recordValue = serializer.serialize(row, subSetOI);
-        fpaths = getDynOutPaths(dpVals, lbDirName);
+
+        // when dynamic partition sorting is not used, the DPSortState will be NONE
+        // in which we will fall back to old method of file system path creation
+        // i.e, having as many record writers as distinct values in partition column
+        if (conf.getDpSortState().equals(DPSortState.NONE)) {
+          fpaths = getDynOutPaths(dpVals, lbDirName);
+        }
 
       } else {
         if (lbDirName != null) {
@@ -648,8 +669,10 @@ public class FileSinkOperator extends Te
       fsp2.taskOutputTempPath =
         new Path(fsp2.taskOutputTempPath, dirName);
     }
-    createBucketFiles(fsp2);
-    valToPaths.put(dirName, fsp2);
+    if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
+      createBucketFiles(fsp2);
+      valToPaths.put(dirName, fsp2);
+    }
     return fsp2;
   }
 
@@ -706,9 +729,16 @@ public class FileSinkOperator extends Te
     // get the path corresponding to the dynamic partition columns,
     String dpDir = getDynPartDirectory(row, dpColNames, numDynParts);
 
+    String pathKey = null;
     if (dpDir != null) {
-      dpDir = appendListBucketingDirName(lbDirName, dpDir);
-      FSPaths fsp2 = valToPaths.get(dpDir);
+      dpDir = appendToSource(lbDirName, dpDir);
+      pathKey = dpDir;
+      if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
+        String buckNum = row.get(row.size() - 1);
+        taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), buckNum);
+        pathKey = appendToSource(taskId, dpDir);
+      }
+      FSPaths fsp2 = valToPaths.get(pathKey);
 
       if (fsp2 == null) {
         // check # of dp
@@ -718,7 +748,39 @@ public class FileSinkOperator extends Te
                ErrorMsg.DYNAMIC_PARTITIONS_TOO_MANY_PER_NODE_ERROR.getErrorCodedMsg()
                + "Maximum was set to: " + maxPartitions);
         }
+
+        if (!conf.getDpSortState().equals(DPSortState.NONE) && prevFsp != null) {
+          // close the previous fsp as it is no longer needed
+          prevFsp.closeWriters(false);
+
+          // since we are closing the previous fsp's record writers, we need to see if we can get
+          // stats from the record writer and store in the previous fsp that is cached
+          if (conf.isGatherStats() && isCollectRWStats) {
+            FSRecordWriter outWriter = prevFsp.outWriters[0];
+            if (outWriter != null) {
+              SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+              if (stats != null) {
+                prevFsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+                prevFsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+              }
+            }
+          }
+
+          // let writers release the memory for garbage collection
+          prevFsp.outWriters[0] = null;
+
+          prevFsp = null;
+        }
+
         fsp2 = createNewPaths(dpDir);
+        if (prevFsp == null) {
+          prevFsp = fsp2;
+        }
+
+        if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
+          createBucketForFileIdx(fsp2, 0);
+          valToPaths.put(pathKey, fsp2);
+        }
       }
       fp = fsp2;
     } else {
@@ -728,17 +790,16 @@ public class FileSinkOperator extends Te
   }
 
   /**
-   * Append list bucketing dir name to original dir name.
-   * Skewed columns cannot be partitioned columns.
-   * @param lbDirName
-   * @param dpDir
+   * Append dir to source dir
+   * @param appendDir
+   * @param srcDir
    * @return
    */
-  private String appendListBucketingDirName(String lbDirName, String dpDir) {
-    StringBuilder builder = new StringBuilder(dpDir);
-    dpDir = (lbDirName == null) ? dpDir : builder.append(Path.SEPARATOR).append(lbDirName)
+  private String appendToSource(String appendDir, String srcDir) {
+    StringBuilder builder = new StringBuilder(srcDir);
+    srcDir = (appendDir == null) ? srcDir : builder.append(Path.SEPARATOR).append(appendDir)
           .toString();
-    return dpDir;
+    return srcDir;
   }
 
   // given the current input row, the mapping for input col info to dp columns, and # of dp cols,
@@ -750,6 +811,26 @@ public class FileSinkOperator extends Te
   }
 
   @Override
+  public void startGroup() throws HiveException {
+    if (!conf.getDpSortState().equals(DPSortState.NONE)) {
+      keyOI = getGroupKeyObjectInspector();
+      keys.clear();
+      keyWritables.clear();
+      ObjectInspectorUtils.partialCopyToStandardObject(keyWritables, getGroupKeyObject(), 0,
+          numKeyColToRead, (StructObjectInspector) keyOI, ObjectInspectorCopyOption.WRITABLE);
+
+      for (Object o : keyWritables) {
+        if (o == null || o.toString().length() == 0) {
+          keys.add(dpCtx.getDefaultPartitionName());
+        } else {
+          keys.add(o.toString());
+        }
+      }
+      fpaths = getDynOutPaths(keys, null);
+    }
+  }
+
+  @Override
   public void closeOp(boolean abort) throws HiveException {
     if (!bDynParts && !filesCreated) {
       createBucketFiles(fsp);
@@ -908,6 +989,20 @@ public class FileSinkOperator extends Te
       String fspKey = entry.getKey();     // DP/LB
       FSPaths fspValue = entry.getValue();
 
+      // for bucketed tables, hive.optimize.sort.dynamic.partition optimization
+      // adds the taskId to the fspKey.
+      if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
+        taskID = Utilities.getTaskIdFromFilename(fspKey);
+        // if length of (prefix/ds=__HIVE_DEFAULT_PARTITION__/000000_0) is greater than max key prefix
+        // and if (prefix/ds=10/000000_0) is less than max key prefix, then former will get hashed
+        // to a smaller prefix (MD5hash/000000_0) and later will stored as such in staging stats table.
+        // When stats gets aggregated in StatsTask only the keys that starts with "prefix" will be fetched.
+        // Now that (prefix/ds=__HIVE_DEFAULT_PARTITION__) is hashed to a smaller prefix it will
+        // not be retrieved from staging table and hence not aggregated. To avoid this issue
+        // we will remove the taskId from the key which is redundant anyway.
+        fspKey = fspKey.split(taskID)[0];
+      }
+
       // split[0] = DP, split[1] = LB
       String[] split = splitKey(fspKey);
       String dpSpec = split[0];

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Mon Mar 24 18:50:51 2014
@@ -492,6 +492,8 @@ public abstract class Operator<T extends
 
     LOG.debug("Starting group for children:");
     for (Operator<? extends OperatorDesc> op : childOperators) {
+      op.setGroupKeyObjectInspector(groupKeyOI);
+      op.setGroupKeyObject(groupKeyObject);
       op.startGroup();
     }
 
@@ -958,6 +960,7 @@ public abstract class Operator<T extends
   }
 
   protected transient Object groupKeyObject;
+  protected transient ObjectInspector groupKeyOI;
 
   public String getOperatorId() {
     return operatorId;
@@ -1254,4 +1257,12 @@ public abstract class Operator<T extends
       LOG.warn("Cannot set stats when there's no descriptor: "+this);
     }
   }
+
+  public void setGroupKeyObjectInspector(ObjectInspector keyObjectInspector) {
+    this.groupKeyOI = keyObjectInspector;
+  }
+
+  public ObjectInspector getGroupKeyObjectInspector() {
+    return groupKeyOI;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Mon Mar 24 18:50:51 2014
@@ -64,6 +64,36 @@ public class OperatorUtils {
     return found;
   }
 
+  public static <T> Set<T> findOperatorsUpstream(Operator<?> start, Class<T> clazz) {
+    return findOperatorsUpstream(start, clazz, new HashSet<T>());
+  }
+
+  public static <T> T findSingleOperatorUpstream(Operator<?> start, Class<T> clazz) {
+    Set<T> found = findOperatorsUpstream(start, clazz, new HashSet<T>());
+    return found.size() == 1 ? found.iterator().next() : null;
+  }
+
+  public static <T> Set<T> findOperatorsUpstream(Collection<Operator<?>> starts, Class<T> clazz) {
+    Set<T> found = new HashSet<T>();
+    for (Operator<?> start : starts) {
+      findOperatorsUpstream(start, clazz, found);
+    }
+    return found;
+  }
+
+  @SuppressWarnings("unchecked")
+  private static <T> Set<T> findOperatorsUpstream(Operator<?> start, Class<T> clazz, Set<T> found) {
+    if (clazz.isInstance(start)) {
+      found.add((T) start);
+    }
+    if (start.getParentOperators() != null) {
+      for (Operator<?> parent : start.getParentOperators()) {
+        findOperatorsUpstream(parent, clazz, found);
+      }
+    }
+    return found;
+  }
+
   public static void setChildrenCollector(List<Operator<? extends OperatorDesc>> childOperators, OutputCollector out) {
     if (childOperators == null) {
       return;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Mon Mar 24 18:50:51 2014
@@ -45,8 +45,9 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
 
 /**
@@ -78,6 +79,10 @@ public class ReduceSinkOperator extends 
    * goes to. Partition columns are not passed to reducer.
    */
   protected transient ExprNodeEvaluator[] partitionEval;
+  /**
+   * Evaluators for bucketing columns. This is used to compute bucket number.
+   */
+  protected transient ExprNodeEvaluator[] bucketEval = null;
 
   // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
   // ready
@@ -132,6 +137,18 @@ public class ReduceSinkOperator extends 
         partitionEval[i++] = index < 0 ? ExprNodeEvaluatorFactory.get(e): keyEval[index];
       }
 
+      if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) {
+        bucketEval = new ExprNodeEvaluator[conf.getBucketCols().size()];
+
+        i = 0;
+        for (ExprNodeDesc e : conf.getBucketCols()) {
+          int index = ExprNodeDescUtils.indexOf(e, keys);
+          bucketEval[i++] = index < 0 ? ExprNodeEvaluatorFactory.get(e) : keyEval[index];
+        }
+
+        buckColIdxInKey = conf.getPartitionCols().size();
+      }
+
       tag = conf.getTag();
       tagByte[0] = (byte) tag;
       LOG.info("Using tag = " + tag);
@@ -167,6 +184,8 @@ public class ReduceSinkOperator extends 
   protected transient ObjectInspector keyObjectInspector;
   protected transient ObjectInspector valueObjectInspector;
   transient ObjectInspector[] partitionObjectInspectors;
+  transient ObjectInspector[] bucketObjectInspectors = null;
+  transient int buckColIdxInKey;
 
   protected transient Object[] cachedValues;
   protected transient List<List<Integer>> distinctColIndices;
@@ -245,9 +264,12 @@ public class ReduceSinkOperator extends 
         keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
             distinctColIndices,
             conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
-        valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval, conf
-            .getOutputValueColumnNames(), rowInspector);
+        valueObjectInspector = initEvaluatorsAndReturnStruct(valueEval,
+            conf.getOutputValueColumnNames(), rowInspector);
         partitionObjectInspectors = initEvaluators(partitionEval, rowInspector);
+        if (bucketEval != null) {
+          bucketObjectInspectors = initEvaluators(bucketEval, rowInspector);
+        }
         int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
         int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : numDistributionKeys;
         cachedKeys = new Object[numKeys][keyLen];
@@ -256,6 +278,14 @@ public class ReduceSinkOperator extends 
 
       // Determine distKeyLength (w/o distincts), and then add the first if present.
       populateCachedDistributionKeys(row, 0);
+
+      // replace bucketing columns with hashcode % numBuckets
+      int buckNum = 0;
+      if (bucketEval != null) {
+        buckNum = computeBucketNumber(row, conf.getNumBuckets());
+        cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
+      }
+
       HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
       int distKeyLength = firstKey.getDistKeyLength();
       if (numDistinctExprs > 0) {
@@ -268,7 +298,13 @@ public class ReduceSinkOperator extends 
       if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
       // Compute value and hashcode - we'd either store or forward them.
       BytesWritable value = makeValueWritable(row);
-      int hashCode = computeHashCode(row);
+      int hashCode = 0;
+      if (bucketEval == null) {
+        hashCode = computeHashCode(row);
+      } else {
+        hashCode = computeHashCode(row, buckNum);
+      }
+
       if (firstIndex == TopNHash.FORWARD) {
         firstKey.setHashCode(hashCode);
         collect(firstKey, value);
@@ -292,6 +328,20 @@ public class ReduceSinkOperator extends 
     }
   }
 
+  private int computeBucketNumber(Object row, int numBuckets) throws HiveException {
+    int buckNum = 0;
+    for (int i = 0; i < bucketEval.length; i++) {
+      Object o = bucketEval[i].evaluate(row);
+      buckNum = buckNum * 31 + ObjectInspectorUtils.hashCode(o, bucketObjectInspectors[i]);
+    }
+
+    if (buckNum < 0) {
+      buckNum = -1 * buckNum;
+    }
+
+    return buckNum % numBuckets;
+  }
+
   private void populateCachedDistributionKeys(Object row, int index) throws HiveException {
     for (int i = 0; i < numDistributionKeys; i++) {
       cachedKeys[index][i] = keyEval[i].evaluate(row);
@@ -340,6 +390,33 @@ public class ReduceSinkOperator extends 
     return keyHashCode;
   }
 
+  private int computeHashCode(Object row, int buckNum) throws HiveException {
+    // Evaluate the HashCode
+    int keyHashCode = 0;
+    if (partitionEval.length == 0) {
+      // If no partition cols, just distribute the data uniformly to provide better
+      // load balance. If the requirement is to have a single reducer, we should set
+      // the number of reducers to 1.
+      // Use a constant seed to make the code deterministic.
+      if (random == null) {
+        random = new Random(12345);
+      }
+      keyHashCode = random.nextInt();
+    } else {
+      // partitionEval will include all columns from distribution columns i.e;
+      // partition columns + bucket number columns. Bucket number column is
+      // initialized with -1. Ignore that and use bucket number instead
+      for (int i = 0; i < partitionEval.length - 1; i++) {
+        Object o = partitionEval[i].evaluate(row);
+        keyHashCode = keyHashCode * 31
+            + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+      }
+
+      keyHashCode = keyHashCode * 31 + buckNum;
+    }
+    return keyHashCode;
+  }
+
   // Serialize the keys and append the tag
   protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException {
     BinaryComparable key = (BinaryComparable)keySerializer.serialize(obj, keyObjectInspector);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Mar 24 18:50:51 2014
@@ -1993,6 +1993,14 @@ public final class Utilities {
     return names;
   }
 
+  public static List<String> getInternalColumnNamesFromSignature(List<ColumnInfo> colInfos) {
+    List<String> names = new ArrayList<String>();
+    for (ColumnInfo ci : colInfos) {
+      names.add(ci.getInternalName());
+    }
+    return names;
+  }
+
   public static List<String> getColumnNames(Properties props) {
     List<String> names = new ArrayList<String>();
     String colNames = props.getProperty(serdeConstants.LIST_COLUMNS);

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Mon Mar 24 18:50:51 2014
@@ -150,6 +150,7 @@ public class ExecReducer extends MapRedu
         ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
         ois.add(keyObjectInspector);
         ois.add(valueObjectInspector[tag]);
+        reducer.setGroupKeyObjectInspector(keyObjectInspector);
         rowObjectInspector[tag] = ObjectInspectorFactory
             .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois);
       }
@@ -227,8 +228,8 @@ public class ExecReducer extends MapRedu
 
         groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
         l4j.trace("Start Group");
-        reducer.startGroup();
         reducer.setGroupKeyObject(keyObject);
+        reducer.startGroup();
       }
       // System.err.print(keyObject.toString());
       while (values.hasNext()) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Mon Mar 24 18:50:51 2014
@@ -124,6 +124,7 @@ public class ReduceRecordProcessor  exte
           .getDeserializerClass(), null);
       inputKeyDeserializer.initialize(null, keyTableDesc.getProperties());
       keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+      reducer.setGroupKeyObjectInspector(keyObjectInspector);
       valueTableDesc = new TableDesc[redWork.getTagToValueDesc().size()];
       for (int tag = 0; tag < redWork.getTagToValueDesc().size(); tag++) {
         // We should initialize the SerDe with the TypeInfo when available.
@@ -291,8 +292,8 @@ public class ReduceRecordProcessor  exte
 
         groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
         l4j.trace("Start Group");
-        reducer.startGroup();
         reducer.setGroupKeyObject(keyObject);
+        reducer.startGroup();
       }
 
       //process all the values we have for this key

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Mon Mar 24 18:50:51 2014
@@ -97,6 +97,12 @@ public class Optimizer {
 
     transformations.add(new UnionProcessor());
     transformations.add(new JoinReorder());
+    if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONING) &&
+        HiveConf.getVar(hiveConf, HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equals("nonstrict") &&
+        HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTDYNAMICPARTITION) &&
+        !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTLISTBUCKETING)) {
+      transformations.add(new SortedDynPartitionOptimizer());
+    }
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTREDUCEDEDUPLICATION)) {
       transformations.add(new ReduceSinkDeDuplication());
     }

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java?rev=1580992&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java (added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SortedDynPartitionOptimizer.java Mon Mar 24 18:50:51 2014
@@ -0,0 +1,460 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.common.ObjectPair;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.ql.exec.ColumnInfo;
+import org.apache.hadoop.hive.ql.exec.ExtractOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.GraphWalker;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.lib.PreOrderWalker;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.OpParseContext;
+import org.apache.hadoop.hive.ql.parse.ParseContext;
+import org.apache.hadoop.hive.ql.parse.RowResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExtractDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.IntWritable;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * When dynamic partitioning (with or without bucketing and sorting) is enabled, this optimization
+ * sorts the records on partition, bucket and sort columns respectively before inserting records
+ * into the destination table. This enables reducers to keep only one record writer all the time
+ * thereby reducing the the memory pressure on the reducers. This optimization will force a reducer
+ * even when hive.enforce.bucketing and hive.enforce.sorting is set to false.
+ */
+public class SortedDynPartitionOptimizer implements Transform {
+
+  @Override
+  public ParseContext transform(ParseContext pCtx) throws SemanticException {
+
+    // create a walker which walks the tree in a DFS manner while maintaining the
+    // operator stack. The dispatcher generates the plan from the operator tree
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+
+    String FS = FileSinkOperator.getOperatorName() + "%";
+
+    opRules.put(new RuleRegExp("Sorted Dynamic Partition", FS), getSortDynPartProc(pCtx));
+
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, null);
+    GraphWalker ogw = new PreOrderWalker(disp);
+
+    ArrayList<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    ogw.startWalking(topNodes, null);
+
+    return pCtx;
+  }
+
+  private NodeProcessor getSortDynPartProc(ParseContext pCtx) {
+    return new SortedDynamicPartitionProc(pCtx);
+  }
+
+  class SortedDynamicPartitionProc implements NodeProcessor {
+
+    private final Log LOG = LogFactory.getLog(SortedDynPartitionOptimizer.class);
+    protected ParseContext parseCtx;
+
+    public SortedDynamicPartitionProc(ParseContext pCtx) {
+      this.parseCtx = pCtx;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+
+      // introduce RS and EX before FS. If the operator tree already contains
+      // RS then ReduceSinkDeDuplication optimization should merge them
+      FileSinkOperator fsOp = (FileSinkOperator) nd;
+
+      // if not dynamic partitioning then bail out
+      if (fsOp.getConf().getDynPartCtx() == null) {
+        LOG.debug("Bailing out of sort dynamic partition optimization as dpCtx is null");
+        return null;
+      }
+
+      // if RS is inserted by enforce bucketing or sorting, we need to remove it
+      // since ReduceSinkDeDuplication will not merge them to single RS.
+      // RS inserted by enforce bucketing/sorting will have bucketing column in
+      // reduce sink key whereas RS inserted by this optimization will have
+      // partition columns followed by bucket number followed by sort columns in
+      // the reduce sink key. Since both key columns are not prefix subset
+      // ReduceSinkDeDuplication will not merge them together resulting in 2 MR jobs.
+      // To avoid that we will remove the RS (and EX) inserted by enforce bucketing/sorting.
+      removeRSInsertedByEnforceBucketing(fsOp);
+
+      // unlink connection between FS and its parent
+      Operator<? extends OperatorDesc> fsParent = fsOp.getParentOperators().get(0);
+      fsParent.getChildOperators().clear();
+
+      DynamicPartitionCtx dpCtx = fsOp.getConf().getDynPartCtx();
+      Table destTable = parseCtx.getFsopToTable().get(fsOp);
+      if (destTable == null) {
+        LOG.debug("Bailing out of sort dynamic partition optimization as destination table is null");
+        return null;
+      }
+      int numBuckets = destTable.getNumBuckets();
+
+      // if enforce bucketing/sorting is disabled numBuckets will not be set.
+      // set the number of buckets here to ensure creation of empty buckets
+      dpCtx.setNumBuckets(numBuckets);
+
+      // Get the positions for partition, bucket and sort columns
+      List<Integer> bucketPositions = getBucketPositions(destTable.getBucketCols(),
+          destTable.getCols());
+      ObjectPair<List<Integer>, List<Integer>> sortOrderPositions = getSortPositionsOrder(
+          destTable.getSortCols(), destTable.getCols());
+      List<Integer> sortPositions = sortOrderPositions.getFirst();
+      List<Integer> sortOrder = sortOrderPositions.getSecond();
+      List<Integer> partitionPositions = getPartitionPositions(dpCtx, fsParent.getSchema());
+      List<ColumnInfo> colInfos = parseCtx.getOpParseCtx().get(fsParent).getRowResolver()
+          .getColumnInfos();
+      ArrayList<ExprNodeDesc> bucketColumns = getPositionsToExprNodes(bucketPositions, colInfos);
+
+      // update file sink descriptor
+      fsOp.getConf().setMultiFileSpray(false);
+      fsOp.getConf().setNumFiles(1);
+      fsOp.getConf().setTotalFiles(1);
+
+      // Create ReduceSinkDesc
+      RowResolver inputRR = parseCtx.getOpParseCtx().get(fsParent).getRowResolver();
+      ObjectPair<String, RowResolver> pair = copyRowResolver(inputRR);
+      RowResolver outRR = pair.getSecond();
+      ArrayList<ColumnInfo> valColInfo = Lists.newArrayList(fsParent.getSchema().getSignature());
+      ArrayList<ExprNodeDesc> newValueCols = Lists.newArrayList();
+      Map<String, ExprNodeDesc> colExprMap = Maps.newHashMap();
+      for (ColumnInfo ci : valColInfo) {
+        newValueCols.add(new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(), ci
+            .getTabAlias(), ci.isHiddenVirtualCol()));
+        colExprMap.put(ci.getInternalName(), newValueCols.get(newValueCols.size() - 1));
+      }
+      ReduceSinkDesc rsConf = getReduceSinkDesc(partitionPositions, sortPositions, sortOrder,
+          newValueCols, bucketColumns, numBuckets, fsParent);
+
+      // Create ReduceSink operator
+      ReduceSinkOperator rsOp = (ReduceSinkOperator) putOpInsertMap(
+          OperatorFactory.getAndMakeChild(rsConf, new RowSchema(outRR.getColumnInfos()), fsParent),
+          outRR, parseCtx);
+      rsOp.setColumnExprMap(colExprMap);
+
+      // Create ExtractDesc
+      ObjectPair<String, RowResolver> exPair = copyRowResolver(outRR);
+      RowResolver exRR = exPair.getSecond();
+      ExtractDesc exConf = new ExtractDesc(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo,
+          Utilities.ReduceField.VALUE.toString(), "", false));
+
+      // Create Extract Operator
+      ExtractOperator exOp = (ExtractOperator) putOpInsertMap(
+          OperatorFactory.getAndMakeChild(exConf, new RowSchema(exRR.getColumnInfos()), rsOp),
+          exRR, parseCtx);
+
+      // link EX to FS
+      fsOp.getParentOperators().clear();
+      fsOp.getParentOperators().add(exOp);
+      exOp.getChildOperators().add(fsOp);
+
+      // Set if partition sorted or partition bucket sorted
+      fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_SORTED);
+      if (bucketColumns.size() > 0) {
+        fsOp.getConf().setDpSortState(FileSinkDesc.DPSortState.PARTITION_BUCKET_SORTED);
+      }
+
+      // update partition column info in FS descriptor
+      ArrayList<ExprNodeDesc> partitionColumns = getPositionsToExprNodes(partitionPositions, rsOp
+          .getSchema().getSignature());
+      fsOp.getConf().setPartitionCols(partitionColumns);
+
+      LOG.info("Inserted " + rsOp.getOperatorId() + " and " + exOp.getOperatorId()
+          + " as parent of " + fsOp.getOperatorId() + " and child of " + fsParent.getOperatorId());
+      return null;
+    }
+
+    // Remove RS and EX introduced by enforce bucketing/sorting config
+    // Convert PARENT -> RS -> EX -> FS to PARENT -> FS
+    private void removeRSInsertedByEnforceBucketing(FileSinkOperator fsOp) {
+      HiveConf hconf = parseCtx.getConf();
+      boolean enforceBucketing = HiveConf.getBoolVar(hconf, ConfVars.HIVEENFORCEBUCKETING);
+      boolean enforceSorting = HiveConf.getBoolVar(hconf, ConfVars.HIVEENFORCESORTING);
+      if (enforceBucketing || enforceSorting) {
+        Set<ReduceSinkOperator> reduceSinks = OperatorUtils.findOperatorsUpstream(fsOp,
+            ReduceSinkOperator.class);
+        Operator<? extends OperatorDesc> rsToRemove = null;
+        List<ReduceSinkOperator> rsOps = parseCtx
+            .getReduceSinkOperatorsAddedByEnforceBucketingSorting();
+        boolean found = false;
+
+        // iterate through all RS and locate the one introduce by enforce bucketing
+        for (ReduceSinkOperator reduceSink : reduceSinks) {
+          for (ReduceSinkOperator rsOp : rsOps) {
+            if (reduceSink.equals(rsOp)) {
+              rsToRemove = reduceSink;
+              found = true;
+              break;
+            }
+          }
+
+          if (found) {
+            break;
+          }
+        }
+
+        // iF RS is found remove it and its child (EX) and connect its parent
+        // and grand child
+        if (found) {
+          Operator<? extends OperatorDesc> rsParent = rsToRemove.getParentOperators().get(0);
+          Operator<? extends OperatorDesc> rsChild = rsToRemove.getChildOperators().get(0);
+          Operator<? extends OperatorDesc> rsGrandChild = rsChild.getChildOperators().get(0);
+
+          if (rsChild instanceof ExtractOperator) {
+            rsParent.getChildOperators().clear();
+            rsParent.getChildOperators().add(rsGrandChild);
+            rsGrandChild.getParentOperators().clear();
+            rsGrandChild.getParentOperators().add(rsParent);
+            parseCtx.removeOpParseCtx(rsToRemove);
+            parseCtx.removeOpParseCtx(rsChild);
+            LOG.info("Removed " + rsParent.getOperatorId() + " and " + rsChild.getOperatorId()
+                + " as it was introduced by enforce bucketing/sorting.");
+          }
+        }
+      }
+    }
+
+    private List<Integer> getPartitionPositions(DynamicPartitionCtx dpCtx, RowSchema schema) {
+      int numPartCols = dpCtx.getNumDPCols();
+      int numCols = schema.getSignature().size();
+      List<Integer> partPos = Lists.newArrayList();
+
+      // partition columns will always at the last
+      for (int i = numCols - numPartCols; i < numCols; i++) {
+        partPos.add(i);
+      }
+      return partPos;
+    }
+
+    // Get the bucket positions for the table
+    private List<Integer> getBucketPositions(List<String> tabBucketCols, List<FieldSchema> tabCols) {
+      List<Integer> posns = new ArrayList<Integer>();
+      for (String bucketCol : tabBucketCols) {
+        int pos = 0;
+        for (FieldSchema tabCol : tabCols) {
+          if (bucketCol.equals(tabCol.getName())) {
+            posns.add(pos);
+            break;
+          }
+          pos++;
+        }
+      }
+      return posns;
+    }
+
+    public ReduceSinkDesc getReduceSinkDesc(List<Integer> partitionPositions,
+        List<Integer> sortPositions, List<Integer> sortOrder, ArrayList<ExprNodeDesc> newValueCols,
+        ArrayList<ExprNodeDesc> bucketColumns, int numBuckets,
+        Operator<? extends OperatorDesc> parent) {
+
+      // Order of KEY columns
+      // 1) Partition columns
+      // 2) Bucket number column
+      // 3) Sort columns
+      List<Integer> keyColsPosInVal = Lists.newArrayList();
+      ArrayList<ExprNodeDesc> newKeyCols = Lists.newArrayList();
+      List<Integer> newSortOrder = Lists.newArrayList();
+      int numPartAndBuck = partitionPositions.size();
+
+      keyColsPosInVal.addAll(partitionPositions);
+      if (!bucketColumns.isEmpty()) {
+        keyColsPosInVal.add(-1);
+        numPartAndBuck += 1;
+      }
+      keyColsPosInVal.addAll(sortPositions);
+
+      // by default partition and bucket columns are sorted in ascending order
+      Integer order = 1;
+      if (sortOrder != null && !sortOrder.isEmpty()) {
+        if (sortOrder.get(0).intValue() == 0) {
+          order = 0;
+        }
+      }
+      for (int i = 0; i < numPartAndBuck; i++) {
+        newSortOrder.add(order);
+      }
+      newSortOrder.addAll(sortOrder);
+
+      ArrayList<ExprNodeDesc> newPartCols = Lists.newArrayList();
+
+      // we will clone here as RS will update bucket column key with its
+      // corresponding with bucket number and hence their OIs
+      for (Integer idx : keyColsPosInVal) {
+        if (idx < 0) {
+          newKeyCols.add(new ExprNodeConstantDesc(TypeInfoFactory
+              .getPrimitiveTypeInfoFromPrimitiveWritable(IntWritable.class), -1));
+        } else {
+          newKeyCols.add(newValueCols.get(idx).clone());
+        }
+      }
+
+      for (Integer idx : partitionPositions) {
+        newPartCols.add(newValueCols.get(idx).clone());
+      }
+
+      String orderStr = "";
+      for (int i = 0; i < newKeyCols.size(); i++) {
+        orderStr += "+";
+      }
+
+      // Create Key/Value TableDesc. When the operator plan is split into MR tasks,
+      // the reduce operator will initialize Extract operator with information
+      // from Key and Value TableDesc
+      List<FieldSchema> fields = PlanUtils.getFieldSchemasFromColumnList(newKeyCols,
+          "reducesinkkey");
+      TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, orderStr);
+      ArrayList<String> outputKeyCols = Lists.newArrayList();
+      for (int i = 0; i < newKeyCols.size(); i++) {
+        outputKeyCols.add("reducesinkkey" + i);
+      }
+
+      List<String> outCols = Utilities.getInternalColumnNamesFromSignature(parent.getSchema()
+          .getSignature());
+      ArrayList<String> outValColNames = Lists.newArrayList(outCols);
+      List<FieldSchema> valFields = PlanUtils.getFieldSchemasFromColumnList(newValueCols,
+          outValColNames, 0, "");
+      TableDesc valueTable = PlanUtils.getReduceValueTableDesc(valFields);
+      List<List<Integer>> distinctColumnIndices = Lists.newArrayList();
+      int numDistributionKeys = newPartCols.size();
+      if (bucketColumns != null && !bucketColumns.isEmpty()) {
+        numDistributionKeys += 1;
+      }
+
+      // Number of reducers is set to default (-1)
+      ReduceSinkDesc rsConf = new ReduceSinkDesc(newKeyCols, numDistributionKeys, newValueCols,
+          outputKeyCols, distinctColumnIndices, outValColNames, -1, newPartCols, -1, keyTable,
+          valueTable);
+      rsConf.setBucketCols(bucketColumns);
+      rsConf.setNumBuckets(numBuckets);
+
+      return rsConf;
+    }
+
+    /**
+     * Get the sort positions and sort order for the sort columns
+     * @param tabSortCols
+     * @param tabCols
+     * @return
+     */
+    private ObjectPair<List<Integer>, List<Integer>> getSortPositionsOrder(List<Order> tabSortCols,
+        List<FieldSchema> tabCols) {
+      List<Integer> sortPositions = Lists.newArrayList();
+      List<Integer> sortOrders = Lists.newArrayList();
+      for (Order sortCol : tabSortCols) {
+        int pos = 0;
+        for (FieldSchema tabCol : tabCols) {
+          if (sortCol.getCol().equals(tabCol.getName())) {
+            sortPositions.add(pos);
+            sortOrders.add(sortCol.getOrder());
+            break;
+          }
+          pos++;
+        }
+      }
+      return new ObjectPair<List<Integer>, List<Integer>>(sortPositions, sortOrders);
+    }
+
+    private ArrayList<ExprNodeDesc> getPositionsToExprNodes(List<Integer> pos,
+        List<ColumnInfo> colInfos) {
+      ArrayList<ExprNodeDesc> cols = Lists.newArrayList();
+
+      for (Integer idx : pos) {
+        ColumnInfo ci = colInfos.get(idx);
+        ExprNodeColumnDesc encd = new ExprNodeColumnDesc(ci.getType(), ci.getInternalName(),
+            ci.getTabAlias(), ci.isHiddenVirtualCol());
+        cols.add(encd);
+      }
+
+      return cols;
+    }
+
+    private Operator<? extends Serializable> putOpInsertMap(Operator<?> op, RowResolver rr,
+        ParseContext context) {
+      OpParseContext ctx = new OpParseContext(rr);
+      context.getOpParseCtx().put(op, ctx);
+      return op;
+    }
+
+    private ObjectPair<String, RowResolver> copyRowResolver(RowResolver inputRR) {
+      ObjectPair<String, RowResolver> output = new ObjectPair<String, RowResolver>();
+      RowResolver outRR = new RowResolver();
+      int pos = 0;
+      String tabAlias = null;
+
+      for (ColumnInfo colInfo : inputRR.getColumnInfos()) {
+        String[] info = inputRR.reverseLookup(colInfo.getInternalName());
+        tabAlias = info[0];
+        outRR.put(info[0], info[1], new ColumnInfo(SemanticAnalyzer.getColumnInternalName(pos),
+            colInfo.getType(), info[0], colInfo.getIsVirtualCol(), colInfo.isHiddenVirtualCol()));
+        pos++;
+      }
+      output.setFirst(tabAlias);
+      output.setSecond(outRR);
+      return output;
+    }
+
+  }
+
+}

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java Mon Mar 24 18:50:51 2014
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Stack;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.exec.ExtractOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
@@ -47,7 +48,11 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+
+import com.google.common.collect.Lists;
 
 /**
  * If two reducer sink operators share the same partition/sort columns and order,
@@ -296,6 +301,20 @@ public class ReduceSinkDeDuplication imp
         pRS.getConf().setNumReducers(cRS.getConf().getNumReducers());
       }
 
+      if (result[4] > 0) {
+        // This case happens only when pRS key is empty in which case we can use
+        // number of distribution keys and key serialization info from cRS
+        pRS.getConf().setNumDistributionKeys(cRS.getConf().getNumDistributionKeys());
+        List<FieldSchema> fields = PlanUtils.getFieldSchemasFromColumnList(pRS.getConf()
+            .getKeyCols(), "reducesinkkey");
+        TableDesc keyTable = PlanUtils.getReduceKeyTableDesc(fields, pRS.getConf().getOrder());
+        ArrayList<String> outputKeyCols = Lists.newArrayList();
+        for (int i = 0; i < fields.size(); i++) {
+          outputKeyCols.add(fields.get(i).getName());
+        }
+        pRS.getConf().setOutputKeyColumnNames(outputKeyCols);
+        pRS.getConf().setKeySerializeInfo(keyTable);
+      }
       return true;
     }
 
@@ -333,7 +352,28 @@ public class ReduceSinkDeDuplication imp
       if (movePartitionColTo == null) {
         return null;
       }
-      return new int[] {moveKeyColTo, movePartitionColTo, moveRSOrderTo, moveReducerNumTo};
+      Integer moveNumDistKeyTo = checkNumDistributionKey(cConf.getNumDistributionKeys(),
+          pConf.getNumDistributionKeys());
+      return new int[] {moveKeyColTo, movePartitionColTo, moveRSOrderTo,
+          moveReducerNumTo, moveNumDistKeyTo};
+    }
+
+    private Integer checkNumDistributionKey(int cnd, int pnd) {
+      // number of distribution keys of cRS is chosen only when numDistKeys of pRS
+      // is 0 or less. In all other cases, distribution of the keys is based on
+      // the pRS which is more generic than cRS.
+      // Examples:
+      // case 1: if pRS sort key is (a, b) and cRS sort key is (a, b, c) and number of
+      // distribution keys are 2 and 3 resp. then after merge the sort keys will
+      // be (a, b, c) while the number of distribution keys will be 2.
+      // case 2: if pRS sort key is empty and number of distribution keys is 0
+      // and if cRS sort key is (a, b) and number of distribution keys is 2 then
+      // after merge new sort key will be (a, b) and number of distribution keys
+      // will be 2.
+      if (pnd <= 0) {
+        return 1;
+      }
+      return 0;
     }
 
     /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Mon Mar 24 18:50:51 2014
@@ -30,6 +30,12 @@ import org.apache.hadoop.fs.Path;
 @Explain(displayName = "File Output Operator")
 public class FileSinkDesc extends AbstractOperatorDesc {
   private static final long serialVersionUID = 1L;
+
+  public enum DPSortState {
+    NONE, PARTITION_SORTED, PARTITION_BUCKET_SORTED
+  }
+
+  private DPSortState dpSortState;
   private Path dirName;
   // normally statsKeyPref will be the same as dirName, but the latter
   // could be changed in local execution optimization
@@ -96,6 +102,7 @@ public class FileSinkDesc extends Abstra
     this.totalFiles = totalFiles;
     this.partitionCols = partitionCols;
     this.dpCtx = dpCtx;
+    this.dpSortState = DPSortState.NONE;
   }
 
   public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -110,6 +117,7 @@ public class FileSinkDesc extends Abstra
     this.numFiles = 1;
     this.totalFiles = 1;
     this.partitionCols = null;
+    this.dpSortState = DPSortState.NONE;
   }
 
   @Override
@@ -128,6 +136,7 @@ public class FileSinkDesc extends Abstra
     ret.setStatsReliable(statsReliable);
     ret.setMaxStatsKeyPrefixLength(maxStatsKeyPrefixLength);
     ret.setStatsCollectRawDataSize(statsCollectRawDataSize);
+    ret.setDpSortState(dpSortState);
     return (Object) ret;
   }
 
@@ -381,4 +390,12 @@ public class FileSinkDesc extends Abstra
   public void setRemovedReduceSinkBucketSort(boolean removedReduceSinkBucketSort) {
     this.removedReduceSinkBucketSort = removedReduceSinkBucketSort;
   }
+
+  public DPSortState getDpSortState() {
+    return dpSortState;
+  }
+
+  public void setDpSortState(DPSortState dpSortState) {
+    this.dpSortState = dpSortState;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java Mon Mar 24 18:50:51 2014
@@ -74,6 +74,12 @@ public class ReduceSinkDesc extends Abst
 
   private int numReducers;
 
+  /**
+   * Bucket information
+   */
+  private int numBuckets;
+  private List<ExprNodeDesc> bucketCols;
+
   private int topN = -1;
   private float topNMemoryUsage = -1;
   private boolean mapGroupBy;  // for group-by, values with same key on top-K should be forwarded
@@ -100,6 +106,8 @@ public class ReduceSinkDesc extends Abst
     this.keySerializeInfo = keySerializeInfo;
     this.valueSerializeInfo = valueSerializeInfo;
     this.distinctColumnIndices = distinctColumnIndices;
+    this.setNumBuckets(-1);
+    this.setBucketCols(null);
   }
 
   @Override
@@ -122,6 +130,8 @@ public class ReduceSinkDesc extends Abst
     desc.setPartitionCols((ArrayList<ExprNodeDesc>) getPartitionCols().clone());
     desc.setKeySerializeInfo((TableDesc) getKeySerializeInfo().clone());
     desc.setValueSerializeInfo((TableDesc) getValueSerializeInfo().clone());
+    desc.setNumBuckets(numBuckets);
+    desc.setBucketCols(bucketCols);
     return desc;
   }
 
@@ -299,4 +309,20 @@ public class ReduceSinkDesc extends Abst
   public void setOutputName(String outputName) {
     this.outputName = outputName;
   }
+
+  public int getNumBuckets() {
+    return numBuckets;
+  }
+
+  public void setNumBuckets(int numBuckets) {
+    this.numBuckets = numBuckets;
+  }
+
+  public List<ExprNodeDesc> getBucketCols() {
+    return bucketCols;
+  }
+
+  public void setBucketCols(List<ExprNodeDesc> bucketCols) {
+    this.bucketCols = bucketCols;
+  }
 }

Added: hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q?rev=1580992&view=auto
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q (added)
+++ hive/trunk/ql/src/test/queries/clientpositive/dynpart_sort_optimization.q Mon Mar 24 18:50:51 2014
@@ -0,0 +1,89 @@
+set hive.optimize.sort.dynamic.partition=true;
+set hive.exec.dynamic.partition=true;
+set hive.exec.max.dynamic.partitions=1000;
+set hive.exec.max.dynamic.partitions.pernode=1000;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.enforce.bucketing=false;
+set hive.enforce.sorting=false;
+
+create table over1k(
+           t tinyint,
+           si smallint,
+           i int,
+           b bigint,
+           f float,
+           d double,
+           bo boolean,
+           s string,
+           ts timestamp,
+           dec decimal(4,2),
+           bin binary)
+       row format delimited
+       fields terminated by '|';
+
+load data local inpath '../../data/files/over1k' into table over1k;
+
+create table over1k_part(
+           si smallint,
+           i int,
+           b bigint,
+           f float)
+       partitioned by (ds string, t tinyint);
+
+create table over1k_part_limit like over1k_part;
+
+create table over1k_part_buck(
+           si smallint,
+           i int,
+           b bigint,
+           f float)
+       partitioned by (t tinyint)
+       clustered by (si) into 4 buckets;
+
+create table over1k_part_buck_sort(
+           si smallint,
+           i int,
+           b bigint,
+           f float)
+       partitioned by (t tinyint)
+       clustered by (si) 
+       sorted by (f) into 4 buckets;
+
+-- map-only jobs converted to map-reduce job by hive.optimize.sort.dynamic.partition optimization
+explain insert overwrite table over1k_part partition(ds="foo", t) select si,i,b,f,t from over1k where t is null or t=27;
+explain insert overwrite table over1k_part_limit partition(ds="foo", t) select si,i,b,f,t from over1k where t is null or t=27 limit 10;
+explain insert overwrite table over1k_part_buck partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+explain insert overwrite table over1k_part_buck_sort partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+
+insert overwrite table over1k_part partition(ds="foo", t) select si,i,b,f,t from over1k where t is null or t=27;
+insert overwrite table over1k_part_limit partition(ds="foo", t) select si,i,b,f,t from over1k where t is null or t=27 limit 10;
+insert overwrite table over1k_part_buck partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+insert overwrite table over1k_part_buck_sort partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+
+set hive.enforce.bucketing=true;
+set hive.enforce.sorting=true;
+
+-- map-reduce jobs modified by hive.optimize.sort.dynamic.partition optimization
+explain insert into table over1k_part partition(ds="foo", t) select si,i,b,f,t from over1k where t is null or t=27;
+explain insert into table over1k_part_limit partition(ds="foo", t) select si,i,b,f,t from over1k where t is null or t=27 limit 10;
+explain insert into table over1k_part_buck partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+explain insert into table over1k_part_buck_sort partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+
+insert into table over1k_part partition(ds="foo", t) select si,i,b,f,t from over1k where t is null or t=27;
+insert into table over1k_part_limit partition(ds="foo", t) select si,i,b,f,t from over1k where t is null or t=27 limit 10;
+insert into table over1k_part_buck partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+insert into table over1k_part_buck_sort partition(t) select si,i,b,f,t from over1k where t is null or t=27;
+
+desc formatted over1k_part partition(ds="foo",t=27);
+desc formatted over1k_part partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+desc formatted over1k_part_limit partition(ds="foo",t=27);
+desc formatted over1k_part_limit partition(ds="foo",t="__HIVE_DEFAULT_PARTITION__");
+desc formatted over1k_part_buck partition(t=27);
+desc formatted over1k_part_buck partition(t="__HIVE_DEFAULT_PARTITION__");
+desc formatted over1k_part_buck_sort partition(t=27);
+desc formatted over1k_part_buck_sort partition(t="__HIVE_DEFAULT_PARTITION__");
+
+select count(*) from over1k_part;
+select count(*) from over1k_part_limit;
+select count(*) from over1k_part_buck;
+select count(*) from over1k_part_buck_sort;

Modified: hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition.q?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition.q Mon Mar 24 18:50:51 2014
@@ -14,6 +14,7 @@ set hive.input.format=org.apache.hadoop.
 set hive.merge.mapfiles=false;
 set hive.merge.mapredfiles=false;
 set hive.merge.smallfiles.avgsize=1000000000;
+set hive.optimize.sort.dynamic.partition=false;
 explain
 insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value, hr from srcpart_merge_dp where ds='2008-04-08';
 insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value, hr from srcpart_merge_dp where ds='2008-04-08';

Modified: hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition2.q
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition2.q?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition2.q (original)
+++ hive/trunk/ql/src/test/queries/clientpositive/merge_dynamic_partition2.q Mon Mar 24 18:50:51 2014
@@ -18,6 +18,7 @@ set hive.merge.mapfiles=true;
 set hive.merge.mapredfiles=true;
 set hive.merge.smallfiles.avgsize=3000;
 set hive.exec.compress.output=false;
+set hive.optimize.sort.dynamic.partition=false;
 
 explain
 insert overwrite table merge_dynamic_part partition (ds='2008-04-08', hr) select key, value, hr from srcpart_merge_dp where ds='2008-04-08';

Modified: hive/trunk/ql/src/test/results/clientnegative/dyn_part_max_per_node.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientnegative/dyn_part_max_per_node.q.out?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientnegative/dyn_part_max_per_node.q.out (original)
+++ hive/trunk/ql/src/test/results/clientnegative/dyn_part_max_per_node.q.out Mon Mar 24 18:50:51 2014
@@ -25,7 +25,7 @@ Obtaining error information
 
 Task failed!
 Task ID:
-  Stage-1
+  Stage-2
 
 Logs:
 

Modified: hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_16.q.out
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_16.q.out?rev=1580992&r1=1580991&r2=1580992&view=diff
==============================================================================
--- hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_16.q.out (original)
+++ hive/trunk/ql/src/test/results/clientpositive/auto_sortmerge_join_16.q.out Mon Mar 24 18:50:51 2014
@@ -239,24 +239,12 @@ POSTHOOK: Lineage: bucket_small PARTITIO
 0	val_0	val_0	day1	1
 0	val_0	val_0	day1	1
 0	val_0	val_0	day1	1
-0	val_0	val_0	day1	1
-0	val_0	val_0	day1	1
-0	val_0	val_0	day1	1
-169	val_169	val_169	day1	1
-169	val_169	val_169	day1	1
-169	val_169	val_169	day1	1
 169	val_169	val_169	day1	1
 169	val_169	val_169	day1	1
 169	val_169	val_169	day1	1
 169	val_169	val_169	day1	1
 169	val_169	val_169	day1	1
 374	val_374	val_374	day1	1
-374	val_374	val_374	day1	1
-172	val_172	val_172	day1	1
-172	val_172	val_172	day1	1
 172	val_172	val_172	day1	1
 172	val_172	val_172	day1	1
 103	val_103	val_103	day1	1
-103	val_103	val_103	day1	1
-103	val_103	val_103	day1	1
-103	val_103	val_103	day1	1



Mime
View raw message