hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vik...@apache.org
Subject svn commit: r1582613 [1/4] - in /hive/branches/branch-0.13: common/src/java/org/apache/hadoop/hive/conf/ itests/qtest/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ ql/src/java/org/apache/hadoop/hive/ql/io/...
Date Fri, 28 Mar 2014 05:53:13 GMT
Author: vikram
Date: Fri Mar 28 05:53:12 2014
New Revision: 1582613

URL: http://svn.apache.org/r1582613
Log:
HIVE-6447 : Bucket map joins in hive-tez (Vikram Dixit, reviewed by Harish Butani, Gunther Hagleitner)

Added:
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomEdgeConfiguration.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TezBucketJoinProcCtx.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateOpTraitsProcCtx.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/AnnotateWithOpTraits.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/metainfo/annotation/OpTraitsRulesProcFactory.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OpTraits.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezEdgeProperty.java
    hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez1.q
    hive/branches/branch-0.13/ql/src/test/queries/clientpositive/bucket_map_join_tez2.q
    hive/branches/branch-0.13/ql/src/test/results/clientpositive/tez/bucket_map_join_tez1.q.out
    hive/branches/branch-0.13/ql/src/test/results/clientpositive/tez/bucket_map_join_tez2.q.out
Modified:
    hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/branch-0.13/itests/qtest/pom.xml
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/MapJoinDesc.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/OperatorDesc.java
    hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
    hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
    hive/branches/branch-0.13/ql/src/test/org/apache/hadoop/hive/ql/plan/TestTezWork.java

Modified: hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/branch-0.13/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Fri Mar 28 05:53:12 2014
@@ -986,7 +986,8 @@ public class HiveConf extends Configurat
     // Setting to 0.12:
     //   Maintains division behavior: int / int => double
     // Setting to 0.13:
-    HIVE_COMPAT("hive.compat", HiveCompat.DEFAULT_COMPAT_LEVEL)
+    HIVE_COMPAT("hive.compat", HiveCompat.DEFAULT_COMPAT_LEVEL),
+    HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ("hive.convert.join.bucket.mapjoin.tez", false)
     ;
 
     public final String varname;

Modified: hive/branches/branch-0.13/itests/qtest/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/itests/qtest/pom.xml?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/itests/qtest/pom.xml (original)
+++ hive/branches/branch-0.13/itests/qtest/pom.xml Fri Mar 28 05:53:12 2014
@@ -38,7 +38,7 @@
     <execute.beeline.tests>false</execute.beeline.tests>
     <minimr.query.files>stats_counter_partitioned.q,list_bucket_dml_10.q,input16_cc.q,scriptfile1.q,scriptfile1_win.q,bucket4.q,bucketmapjoin6.q,disable_merge_for_bucketing.q,reduce_deduplicate.q,smb_mapjoin_8.q,join1.q,groupby2.q,bucketizedhiveinputformat.q,bucketmapjoin7.q,optrstat_groupby.q,bucket_num_reducers.q,bucket5.q,load_fs2.q,bucket_num_reducers2.q,infer_bucket_sort_merge.q,infer_bucket_sort_reducers_power_two.q,infer_bucket_sort_dyn_part.q,infer_bucket_sort_bucketed_table.q,infer_bucket_sort_map_operators.q,infer_bucket_sort_num_buckets.q,leftsemijoin_mr.q,schemeAuthority.q,schemeAuthority2.q,truncate_column_buckets.q,remote_script.q,,load_hdfs_file_with_space_in_the_name.q,parallel_orderby.q,import_exported_table.q,stats_counter.q,auto_sortmerge_join_16.q,quotedid_smb.q,file_with_header_footer.q,external_table_with_space_in_location_path.q,root_dir_external_table.q,index_bitmap3.q,ql_rewrite_gbtoidx.q,index_bitmap_auto.q,udf_using.q</minimr.query.files>
     <minimr.query.negative.files>cluster_tasklog_retrieval.q,minimr_broken_pipe.q,mapreduce_stack_trace.q,mapreduce_stack_trace_turnoff.q,mapreduce_stack_trace_hadoop20.q,mapreduce_stack_trace_turnoff_hadoop20.q,file_with_header_footer_negative.q,udf_local_resource.q</minimr.query.negative.files>
-    <minitez.query.files>mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q</minitez.query.files>
+    <minitez.query.files>mapjoin_decimal.q,tez_join_tests.q,tez_joins_explain.q,mrr.q,tez_dml.q,tez_insert_overwrite_local_directory_1.q,tez_union.q,bucket_map_join_tez1.q,bucket_map_join_tez2.q</minitez.query.files>
     <minitez.query.files.shared>dynpart_sort_opt_vectorization.q,dynpart_sort_optimization.q,orc_analyze.q,join0.q,join1.q,auto_join0.q,auto_join1.q,bucket2.q,bucket3.q,bucket4.q,count.q,create_merge_compressed.q,cross_join.q,ctas.q,custom_input_output_format.q,disable_merge_for_bucketing.q,enforce_order.q,filter_join_breaktask.q,filter_join_breaktask2.q,groupby1.q,groupby2.q,groupby3.q,having.q,insert1.q,insert_into1.q,insert_into2.q,leftsemijoin.q,limit_pushdown.q,load_dyn_part1.q,load_dyn_part2.q,load_dyn_part3.q,mapjoin_mapjoin.q,mapreduce1.q,mapreduce2.q,merge1.q,merge2.q,metadata_only_queries.q,sample1.q,subquery_in.q,subquery_exists.q,vectorization_15.q,ptf.q,stats_counter.q,stats_noscan_1.q,stats_counter_partitioned.q,union2.q,union3.q,union4.q,union5.q,union6.q,union7.q,union8.q,union9.q</minitez.query.files.shared>
     <beeline.positive.exclude>add_part_exist.q,alter1.q,alter2.q,alter4.q,alter5.q,alter_rename_partition.q,alter_rename_partition_authorization.q,archive.q,archive_corrupt.q,archive_multi.q,archive_mr_1806.q,archive_multi_mr_1806.q,authorization_1.q,authorization_2.q,authorization_4.q,authorization_5.q,authorization_6.q,authorization_7.q,ba_table1.q,ba_table2.q,ba_table3.q,ba_table_udfs.q,binary_table_bincolserde.q,binary_table_colserde.q,cluster.q,columnarserde_create_shortcut.q,combine2.q,constant_prop.q,create_nested_type.q,create_or_replace_view.q,create_struct_table.q,create_union_table.q,database.q,database_location.q,database_properties.q,ddltime.q,describe_database_json.q,drop_database_removes_partition_dirs.q,escape1.q,escape2.q,exim_00_nonpart_empty.q,exim_01_nonpart.q,exim_02_00_part_empty.q,exim_02_part.q,exim_03_nonpart_over_compat.q,exim_04_all_part.q,exim_04_evolved_parts.q,exim_05_some_part.q,exim_06_one_part.q,exim_07_all_part_over_nonoverlap.q,exim_08_nonpart_rena
 me.q,exim_09_part_spec_nonoverlap.q,exim_10_external_managed.q,exim_11_managed_external.q,exim_12_external_location.q,exim_13_managed_location.q,exim_14_managed_location_over_existing.q,exim_15_external_part.q,exim_16_part_external.q,exim_17_part_managed.q,exim_18_part_external.q,exim_19_00_part_external_location.q,exim_19_part_external_location.q,exim_20_part_managed_location.q,exim_21_export_authsuccess.q,exim_22_import_exist_authsuccess.q,exim_23_import_part_authsuccess.q,exim_24_import_nonexist_authsuccess.q,global_limit.q,groupby_complex_types.q,groupby_complex_types_multi_single_reducer.q,index_auth.q,index_auto.q,index_auto_empty.q,index_bitmap.q,index_bitmap1.q,index_bitmap2.q,index_bitmap3.q,index_bitmap_auto.q,index_bitmap_rc.q,index_compact.q,index_compact_1.q,index_compact_2.q,index_compact_3.q,index_stale_partitioned.q,init_file.q,input16.q,input16_cc.q,input46.q,input_columnarserde.q,input_dynamicserde.q,input_lazyserde.q,input_testxpath3.q,input_testxpath4.q,insert2_o
 verwrite_partitions.q,insertexternal1.q,join_thrift.q,lateral_view.q,load_binary_data.q,load_exist_part_authsuccess.q,load_nonpart_authsuccess.q,load_part_authsuccess.q,loadpart_err.q,lock1.q,lock2.q,lock3.q,lock4.q,merge_dynamic_partition.q,multi_insert.q,multi_insert_move_tasks_share_dependencies.q,null_column.q,ppd_clusterby.q,query_with_semi.q,rename_column.q,sample6.q,sample_islocalmode_hook.q,set_processor_namespaces.q,show_tables.q,source.q,split_sample.q,str_to_map.q,transform1.q,udaf_collect_set.q,udaf_context_ngrams.q,udaf_histogram_numeric.q,udaf_ngrams.q,udaf_percentile_approx.q,udf_array.q,udf_bitmap_and.q,udf_bitmap_or.q,udf_explode.q,udf_format_number.q,udf_map.q,udf_map_keys.q,udf_map_values.q,udf_max.q,udf_min.q,udf_named_struct.q,udf_percentile.q,udf_printf.q,udf_sentences.q,udf_sort_array.q,udf_split.q,udf_struct.q,udf_substr.q,udf_translate.q,udf_union.q,udf_xpath.q,udtf_stack.q,view.q,virtual_column.q</beeline.positive.exclude>
   </properties>

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Mar 28 05:53:12 2014
@@ -100,8 +100,10 @@ public class MapJoinOperator extends Abs
     mapJoinTables = (MapJoinTableContainer[]) cache.retrieve(tableKey);
     mapJoinTableSerdes = (MapJoinTableContainerSerDe[]) cache.retrieve(serdeKey);
     hashTblInitedOnce = true;
+    LOG.info("Try to retrieve from cache");
 
     if (mapJoinTables == null || mapJoinTableSerdes == null) {
+      LOG.info("Did not find tables in cache");
       mapJoinTables = new MapJoinTableContainer[tagLen];
       mapJoinTableSerdes = new MapJoinTableContainerSerDe[tagLen];
       hashTblInitedOnce = false;
@@ -148,8 +150,19 @@ public class MapJoinOperator extends Abs
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
     loader.init(getExecContext(), hconf, this);
     loader.load(mapJoinTables, mapJoinTableSerdes);
-    cache.cache(tableKey, mapJoinTables);
-    cache.cache(serdeKey, mapJoinTableSerdes);
+    if (conf.isBucketMapJoin() == false) {
+      /*
+       * The issue with caching in case of bucket map join is that different tasks
+       * process different buckets and if the container is reused to join a different bucket,
+       * join results can be incorrect. The cache is keyed on operator id and for bucket map join
+       * the operator does not change but data needed is different. For a proper fix, this
+       * requires changes in the Tez API with regard to finding bucket id and 
+       * also ability to schedule tasks to re-use containers that have cached the specific bucket.
+       */
+      LOG.info("This is not bucket map join, so cache");
+      cache.cache(tableKey, mapJoinTables);
+      cache.cache(serdeKey, mapJoinTableSerdes);
+    }
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.LOAD_HASHTABLE);
   }
 

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Mar 28 05:53:12 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -1246,6 +1247,25 @@ public abstract class Operator<T extends
     }
     return null;
   }
+  
+  public OpTraits getOpTraits() {
+    if (conf != null) {
+      return conf.getOpTraits();
+    }
+    
+    return null;
+  }
+  
+  public void setOpTraits(OpTraits metaInfo) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Setting traits ("+metaInfo+") on "+this);
+    }
+    if (conf != null) {
+      conf.setOpTraits(metaInfo);
+    } else {
+      LOG.warn("Cannot set traits when there's no descriptor: "+this);
+    }
+  }
 
   public void setStatistics(Statistics stats) {
     if (LOG.isDebugEnabled()) {

Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomEdgeConfiguration.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomEdgeConfiguration.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomEdgeConfiguration.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomEdgeConfiguration.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.io.Writable;
+
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Multimap;
+
+class CustomEdgeConfiguration implements Writable {
+  boolean vertexInited = false;
+  int numBuckets = -1;
+  Multimap<Integer, Integer> bucketToTaskMap = null;
+  
+  public CustomEdgeConfiguration() {
+  }
+  
+  public CustomEdgeConfiguration(int numBuckets, Multimap<Integer, Integer> routingTable) {
+    this.bucketToTaskMap = routingTable;
+    this.numBuckets = numBuckets;
+    if (routingTable != null) {
+      vertexInited = true;
+    }
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(vertexInited);
+    out.writeInt(numBuckets);
+    if (bucketToTaskMap == null) {
+      return;
+    }
+    
+    out.writeInt(bucketToTaskMap.size());
+    for (Entry<Integer, Collection<Integer>> entry : bucketToTaskMap.asMap().entrySet()) {
+      int bucketNum = entry.getKey();
+      for (Integer taskId : entry.getValue()) {
+        out.writeInt(bucketNum);
+        out.writeInt(taskId);
+      }
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.vertexInited = in.readBoolean();
+    this.numBuckets = in.readInt();
+    if (this.vertexInited == false) {
+      return;
+    }
+
+    int count = in.readInt();
+    bucketToTaskMap = LinkedListMultimap.create();
+    for (int i = 0; i < count; i++) {
+      bucketToTaskMap.put(in.readInt(), in.readInt());
+    }
+
+    if (count != bucketToTaskMap.size()) {
+      throw new IOException("Was not a clean translation. Some records are missing");
+    }
+  }
+
+  public Multimap<Integer, Integer> getRoutingTable() {
+    return bucketToTaskMap;
+  }
+
+  public int getNumBuckets() {
+    return numBuckets;
+  }
+}
\ No newline at end of file

Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.tez.dag.api.EdgeManager;
+import org.apache.tez.dag.api.EdgeManagerContext;
+import org.apache.tez.runtime.api.events.DataMovementEvent;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
+
+import com.google.common.collect.Multimap;
+
+public class CustomPartitionEdge implements EdgeManager {
+
+  private static final Log LOG = LogFactory.getLog(CustomPartitionEdge.class.getName());
+
+  CustomEdgeConfiguration conf = null;
+
+  // used by the framework at runtime. initialize is the real initializer at runtime
+  public CustomPartitionEdge() {  
+  }
+
+  @Override
+  public int getNumDestinationTaskPhysicalInputs(int numSourceTasks, 
+      int destinationTaskIndex) {
+    return numSourceTasks;
+  }
+
+  @Override
+  public int getNumSourceTaskPhysicalOutputs(int numDestinationTasks, 
+      int sourceTaskIndex) {
+    return conf.getNumBuckets();
+  }
+
+  @Override
+  public int getNumDestinationConsumerTasks(int sourceTaskIndex, int numDestinationTasks) {
+    return numDestinationTasks;
+  }
+
+  // called at runtime to initialize the custom edge.
+  @Override
+  public void initialize(EdgeManagerContext context) {
+    byte[] payload = context.getUserPayload();
+    LOG.info("Initializing the edge, payload: " + payload);
+    if (payload == null) {
+      throw new RuntimeException("Invalid payload");
+    }
+    // De-serialization code
+    DataInputBuffer dib = new DataInputBuffer();
+    dib.reset(payload, payload.length);
+    conf = new CustomEdgeConfiguration();
+    try {
+      conf.readFields(dib);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+
+    LOG.info("Routing table: " + conf.getRoutingTable() + " num Buckets: " + conf.getNumBuckets());
+  }
+
+  @Override
+  public void routeDataMovementEventToDestination(DataMovementEvent event,
+      int sourceTaskIndex, int numDestinationTasks, Map<Integer, List<Integer>> mapDestTaskIndices) {
+    int srcIndex = event.getSourceIndex();
+    List<Integer> destTaskIndices = new ArrayList<Integer>();
+    destTaskIndices.addAll(conf.getRoutingTable().get(srcIndex));
+    mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+  }
+
+  @Override
+  public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, 
+      int numDestinationTasks, Map<Integer, List<Integer>> mapDestTaskIndices) {
+    List<Integer> destTaskIndices = new ArrayList<Integer>();
+    addAllDestinationTaskIndices(numDestinationTasks, destTaskIndices);
+    mapDestTaskIndices.put(new Integer(sourceTaskIndex), destTaskIndices);
+  }
+
+  @Override
+  public int routeInputErrorEventToSource(InputReadErrorEvent event, 
+      int destinationTaskIndex) {
+    return event.getIndex();
+  }
+
+  void addAllDestinationTaskIndices(int numDestinationTasks, List<Integer> taskIndices) {
+    for(int i=0; i<numDestinationTasks; ++i) {
+      taskIndices.add(new Integer(i));
+    }
+  }
+}

Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,402 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.tez;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat;
+import org.apache.hadoop.mapred.split.TezMapredSplitsGrouper;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitProto;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.RootInputConfigureVertexTasksEvent;
+import org.apache.tez.runtime.api.events.RootInputDataInformationEvent;
+import org.apache.tez.runtime.api.events.RootInputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+
+
+/*
+ * Only works with old mapred API
+ * Will only work with a single MRInput for now.
+ */
+public class CustomPartitionVertex implements VertexManagerPlugin {
+
+  private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName());
+
+
+  VertexManagerPluginContext context;
+
+  private Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer>create();
+  private Multimap<Integer, InputSplit> bucketToInitialSplitMap = 
+      ArrayListMultimap.<Integer, InputSplit>create();
+
+  private RootInputConfigureVertexTasksEvent configureVertexTaskEvent;
+  private List<RootInputDataInformationEvent> dataInformationEvents;
+  private Map<Path, List<FileSplit>> pathFileSplitsMap = new TreeMap<Path, List<FileSplit>>();
+  private int numBuckets = -1;
+  private Configuration conf = null;
+  private boolean rootVertexInitialized = false;
+  Multimap<Integer, InputSplit> bucketToGroupedSplitMap;
+
+
+  private Map<Integer, Integer> bucketToNumTaskMap = new HashMap<Integer, Integer>();
+
+  public CustomPartitionVertex() {
+  }
+
+  @Override
+  public void initialize(VertexManagerPluginContext context) {
+    this.context = context; 
+    ByteBuffer byteBuf = ByteBuffer.wrap(context.getUserPayload());
+    this.numBuckets = byteBuf.getInt();
+  }
+
+  @Override
+  public void onVertexStarted(Map<String, List<Integer>> completions) {
+    int numTasks = context.getVertexNumTasks(context.getVertexName());
+    List<Integer> scheduledTasks = new ArrayList<Integer>(numTasks);
+    for (int i=0; i<numTasks; ++i) {
+      scheduledTasks.add(new Integer(i));
+    }
+    context.scheduleVertexTasks(scheduledTasks);
+  }
+
+  @Override
+  public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
+  }
+
+  @Override
+  public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+  }
+
+  // One call per root Input - and for now only one is handled.
+  @Override
+  public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+      List<Event> events) {
+
+    // Ideally, since there's only 1 Input expected at the moment -
+    // ensure this method is called only once. Tez will call it once per Root Input.
+    Preconditions.checkState(rootVertexInitialized == false);
+    rootVertexInitialized = true;
+    try {
+      // This is using the payload from the RootVertexInitializer corresponding
+      // to InputName. Ideally it should be using it's own configuration class - but that
+      // means serializing another instance.
+      MRInputUserPayloadProto protoPayload = 
+          MRHelpers.parseMRInputPayload(inputDescriptor.getUserPayload());
+      this.conf = MRHelpers.createConfFromByteString(protoPayload.getConfigurationBytes());
+
+      /*
+       * Currently in tez, the flow of events is thus: "Generate Splits -> Initialize Vertex"
+       * (with parallelism info obtained from the generate splits phase). The generate splits
+       * phase groups splits using the TezGroupedSplitsInputFormat. However, for bucket map joins
+       * the grouping done by this input format results in incorrect results as the grouper has no
+       * knowledge of buckets. So, we initially set the input format to be HiveInputFormat
+       * (in DagUtils) for the case of bucket map joins so as to obtain un-grouped splits.
+       * We then group the splits corresponding to buckets using the tez grouper which returns
+       * TezGroupedSplits.
+       */
+
+      // This assumes that Grouping will always be used. 
+      // Changing the InputFormat - so that the correct one is initialized in MRInput.
+      this.conf.set("mapred.input.format.class", TezGroupedSplitsInputFormat.class.getName());
+      MRInputUserPayloadProto updatedPayload = MRInputUserPayloadProto
+          .newBuilder(protoPayload)
+          .setConfigurationBytes(MRHelpers.createByteStringFromConf(conf))
+          .build();
+      inputDescriptor.setUserPayload(updatedPayload.toByteArray());
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw new RuntimeException(e);
+    }
+    boolean dataInformationEventSeen = false;
+    for (Event event : events) {
+      if (event instanceof RootInputConfigureVertexTasksEvent) {
+        // No tasks should have been started yet. Checked by initial state check.
+        Preconditions.checkState(dataInformationEventSeen == false);
+        Preconditions
+        .checkState(
+            context.getVertexNumTasks(context.getVertexName()) == -1,
+            "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
+        RootInputConfigureVertexTasksEvent cEvent = (RootInputConfigureVertexTasksEvent) event;
+
+        // The vertex cannot be configured until all DataEvents are seen - to build the routing table.
+        configureVertexTaskEvent = cEvent;
+        dataInformationEvents = Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
+      }
+      if (event instanceof RootInputUpdatePayloadEvent) {
+        // this event can never occur. If it does, fail.
+        Preconditions.checkState(false);
+      } else if (event instanceof RootInputDataInformationEvent) {
+        dataInformationEventSeen = true;
+        RootInputDataInformationEvent diEvent = (RootInputDataInformationEvent) event;
+        dataInformationEvents.add(diEvent);
+        FileSplit fileSplit;
+        try {
+          fileSplit = getFileSplitFromEvent(diEvent);
+        } catch (IOException e) {
+          throw new RuntimeException("Failed to get file split for event: " + diEvent);
+        }
+        List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath()); 
+        if (fsList == null) {
+          fsList = new ArrayList<FileSplit>();
+          pathFileSplitsMap.put(fileSplit.getPath(), fsList);
+        }
+        fsList.add(fileSplit);
+      }
+    }
+
+    setBucketNumForPath(pathFileSplitsMap);
+    try {
+      groupSplits();
+      processAllEvents(inputName);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private void processAllEvents(String inputName) throws IOException {
+
+    List<InputSplit> finalSplits = Lists.newLinkedList();
+    int taskCount = 0;
+    for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
+      int bucketNum = entry.getKey();
+      Collection<InputSplit> initialSplits = entry.getValue();
+      finalSplits.addAll(initialSplits);
+      for (int i = 0; i < initialSplits.size(); i++) {
+        bucketToTaskMap.put(bucketNum, taskCount);
+        taskCount++;
+      }
+    }
+
+    // Construct the EdgeManager descriptor to be used by all edges which need the routing table.
+    EdgeManagerDescriptor hiveEdgeManagerDesc = new EdgeManagerDescriptor(
+        CustomPartitionEdge.class.getName());    
+    byte[] payload = getBytePayload(bucketToTaskMap);
+    hiveEdgeManagerDesc.setUserPayload(payload);
+
+    Map<String, EdgeManagerDescriptor> emMap = Maps.newHashMap();
+
+    // Replace the edge manager for all vertices which have routing type custom.
+    for (Entry<String, EdgeProperty> edgeEntry : context.getInputVertexEdgeProperties().entrySet()) {
+      if (edgeEntry.getValue().getDataMovementType() == DataMovementType.CUSTOM
+          && edgeEntry.getValue().getEdgeManagerDescriptor().getClassName()
+              .equals(CustomPartitionEdge.class.getName())) {
+        emMap.put(edgeEntry.getKey(), hiveEdgeManagerDesc);
+      }
+    }
+
+    LOG.info("Task count is " + taskCount);
+
+    List<RootInputDataInformationEvent> taskEvents = Lists.newArrayListWithCapacity(finalSplits.size());
+    // Re-serialize the splits after grouping.
+    int count = 0;
+    for (InputSplit inputSplit : finalSplits) {
+      MRSplitProto serializedSplit = MRHelpers.createSplitProto(inputSplit);
+      RootInputDataInformationEvent diEvent = new RootInputDataInformationEvent(
+          count, serializedSplit.toByteArray());
+      diEvent.setTargetIndex(count);
+      count++;
+      taskEvents.add(diEvent);
+    }
+
+    // Replace the Edge Managers
+    context.setVertexParallelism(
+        taskCount,
+        new VertexLocationHint(createTaskLocationHintsFromSplits(finalSplits
+            .toArray(new InputSplit[finalSplits.size()]))), emMap);
+
+    // Set the actual events for the tasks.
+    context.addRootInputEvents(inputName, taskEvents);
+  }
+
+  private byte[] getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
+    CustomEdgeConfiguration edgeConf = 
+        new CustomEdgeConfiguration(routingTable.keySet().size(), routingTable);
+    DataOutputBuffer dob = new DataOutputBuffer();
+    edgeConf.write(dob);
+    byte[] serialized = dob.getData();
+
+    return serialized;
+  }
+
+  private FileSplit getFileSplitFromEvent(RootInputDataInformationEvent event)
+      throws IOException {
+    InputSplit inputSplit = null;
+    if (event.getDeserializedUserPayload() != null) {
+      inputSplit = (InputSplit) event.getDeserializedUserPayload();
+    } else {
+      MRSplitProto splitProto = MRSplitProto.parseFrom(event.getUserPayload());
+      SerializationFactory serializationFactory = new SerializationFactory(
+          new Configuration());
+      inputSplit = MRHelpers.createOldFormatSplitFromUserPayload(splitProto,
+          serializationFactory);
+    }
+
+    if (!(inputSplit instanceof FileSplit)) {
+      throw new UnsupportedOperationException(
+          "Cannot handle splits other than FileSplit for the moment");
+    }
+    return (FileSplit) inputSplit;
+  }
+
+  /*
+   * This method generates the map of bucket to file splits.
+   */
+  private void setBucketNumForPath(Map<Path, List<FileSplit>> pathFileSplitsMap) {
+    int bucketNum = 0;
+    int fsCount = 0;
+    for (Map.Entry<Path, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
+      int bucketId = bucketNum % numBuckets;
+      for (FileSplit fsplit : entry.getValue()) {
+        fsCount++;
+        bucketToInitialSplitMap.put(bucketId, fsplit);
+      }
+      bucketNum++;
+    }
+
+    LOG.info("Total number of splits counted: " + fsCount + " and total files encountered: " 
+        + pathFileSplitsMap.size());
+  }
+
+  private void groupSplits () throws IOException {
+    estimateBucketSizes();
+    bucketToGroupedSplitMap = 
+        ArrayListMultimap.<Integer, InputSplit>create(bucketToInitialSplitMap);
+    
+    Map<Integer, Collection<InputSplit>> bucketSplitMap = bucketToInitialSplitMap.asMap();
+    for (int bucketId : bucketSplitMap.keySet()) {
+      Collection<InputSplit>inputSplitCollection = bucketSplitMap.get(bucketId);
+      TezMapredSplitsGrouper grouper = new TezMapredSplitsGrouper();
+
+      InputSplit[] groupedSplits = grouper.getGroupedSplits(conf, 
+          inputSplitCollection.toArray(new InputSplit[0]), bucketToNumTaskMap.get(bucketId),
+          HiveInputFormat.class.getName());
+      LOG.info("Original split size is " + 
+          inputSplitCollection.toArray(new InputSplit[0]).length + 
+          " grouped split size is " + groupedSplits.length);
+      bucketToGroupedSplitMap.removeAll(bucketId);
+      for (InputSplit inSplit : groupedSplits) {
+        bucketToGroupedSplitMap.put(bucketId, inSplit);
+      }
+    }
+  }
+
+  private void estimateBucketSizes() {
+    Map<Integer, Long>bucketSizeMap = new HashMap<Integer, Long>();
+    Map<Integer, Collection<InputSplit>> bucketSplitMap = bucketToInitialSplitMap.asMap();
+    long totalSize = 0;
+    for (int bucketId : bucketSplitMap.keySet()) {
+      Long size = 0L;
+      Collection<InputSplit>inputSplitCollection = bucketSplitMap.get(bucketId);
+      Iterator<InputSplit> iter = inputSplitCollection.iterator();
+      while (iter.hasNext()) {
+        FileSplit fsplit = (FileSplit)iter.next();
+        size += fsplit.getLength();
+        totalSize += fsplit.getLength();
+      }
+      bucketSizeMap.put(bucketId, size);
+    }
+
+    int totalResource = context.getTotalAVailableResource().getMemory();
+    int taskResource = context.getVertexTaskResource().getMemory();
+    float waves = conf.getFloat(
+        TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES,
+        TezConfiguration.TEZ_AM_GROUPING_SPLIT_WAVES_DEFAULT);
+
+    int numTasks = (int)((totalResource*waves)/taskResource);
+    LOG.info("Total resource: " + totalResource + " Task Resource: " + taskResource
+        + " waves: " + waves + " total size of splits: " + totalSize + 
+        " total number of tasks: " + numTasks);
+
+    for (int bucketId : bucketSizeMap.keySet()) {
+      int numEstimatedTasks = 0;
+      if (totalSize != 0) {
+        numEstimatedTasks = (int)(numTasks * bucketSizeMap.get(bucketId) / totalSize);
+      }
+      LOG.info("Estimated number of tasks: " + numEstimatedTasks + " for bucket " + bucketId);
+      if (numEstimatedTasks == 0) {
+        numEstimatedTasks = 1;
+      }
+      bucketToNumTaskMap.put(bucketId, numEstimatedTasks);
+    }
+  }
+
+  private static List<TaskLocationHint> createTaskLocationHintsFromSplits(
+      org.apache.hadoop.mapred.InputSplit[] oldFormatSplits) {
+    Iterable<TaskLocationHint> iterable = Iterables.transform(Arrays.asList(oldFormatSplits),
+        new Function<org.apache.hadoop.mapred.InputSplit, TaskLocationHint>() {
+      @Override
+      public TaskLocationHint apply(org.apache.hadoop.mapred.InputSplit input) {
+        try {
+          if (input.getLocations() != null) {
+            return new TaskLocationHint(new HashSet<String>(Arrays.asList(input.getLocations())),
+                null);
+          } else {
+            LOG.info("NULL Location: returning an empty location hint");
+            return new TaskLocationHint(null,null);
+          }
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    });
+
+    return Lists.newArrayList(iterable);
+  }
+}

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Mar 28 05:53:12 2014
@@ -24,12 +24,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import javax.security.auth.login.LoginException;
@@ -59,13 +61,16 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
+import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure.NullOutputCommitter;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -80,6 +85,7 @@ import org.apache.hadoop.yarn.util.Conve
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
@@ -89,6 +95,7 @@ import org.apache.tez.dag.api.InputDescr
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.api.VertexLocationHint;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.client.PreWarmContext;
@@ -129,7 +136,7 @@ public class DagUtils {
           return new Path(input).toUri();
         }
       });
-    
+
       Set<URI> uris = new HashSet<URI>();
       Iterators.addAll(uris, pathIterator);
 
@@ -201,31 +208,45 @@ public class DagUtils {
    * @param group The parent VertexGroup
    * @param wConf The job conf of the child vertex
    * @param w The child vertex
-   * @param edgeType the type of connection between the two
+   * @param edgeProp the edge property of connection between the two
    * endpoints.
    */
   public GroupInputEdge createEdge(VertexGroup group, JobConf wConf,
-      Vertex w, EdgeType edgeType)
-      throws IOException {
-    
+      Vertex w, TezEdgeProperty edgeProp)
+    throws IOException {
+
     Class mergeInputClass;
-    
+
     LOG.info("Creating Edge between " + group.getGroupName() + " and " + w.getVertexName());
     w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
 
+    EdgeType edgeType = edgeProp.getEdgeType();
     switch (edgeType) {
-    case BROADCAST_EDGE:
-      mergeInputClass = ConcatenatedMergedKeyValueInput.class;
-      break;
-
-    case SIMPLE_EDGE:
-    default:
-      mergeInputClass = TezMergedLogicalInput.class;
-      break;
+      case BROADCAST_EDGE:
+        mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+        break;
+      case CUSTOM_EDGE:
+        mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+        int numBuckets = edgeProp.getNumBuckets();
+        VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
+            CustomPartitionVertex.class.getName());
+        byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
+        desc.setUserPayload(userPayload);
+        w.setVertexManagerPlugin(desc);
+        break;
+
+      case CUSTOM_SIMPLE_EDGE:
+        mergeInputClass = ConcatenatedMergedKeyValueInput.class;
+        break;
+
+      case SIMPLE_EDGE:
+      default:
+        mergeInputClass = TezMergedLogicalInput.class;
+        break;
     }
 
-    return new GroupInputEdge(group, w, createEdgeProperty(edgeType),
-         new InputDescriptor(mergeInputClass.getName()));
+    return new GroupInputEdge(group, w, createEdgeProperty(edgeProp),
+        new InputDescriptor(mergeInputClass.getName()));
   }
 
   /**
@@ -253,43 +274,83 @@ public class DagUtils {
    * @return
    */
   public Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w,
-      EdgeType edgeType)
-      throws IOException {
+      TezEdgeProperty edgeProp)
+    throws IOException {
 
     updateConfigurationForEdge(vConf, v, wConf, w);
 
-    return new Edge(v, w, createEdgeProperty(edgeType));
+    if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) {
+      int numBuckets = edgeProp.getNumBuckets();
+      byte[] userPayload = ByteBuffer.allocate(4).putInt(numBuckets).array();
+      VertexManagerPluginDescriptor desc = new VertexManagerPluginDescriptor(
+          CustomPartitionVertex.class.getName());
+      desc.setUserPayload(userPayload);
+      w.setVertexManagerPlugin(desc);
+    }
+
+    return new Edge(v, w, createEdgeProperty(edgeProp));
   }
 
   /*
    * Helper function to create an edge property from an edge type.
    */
-  private EdgeProperty createEdgeProperty(EdgeType edgeType) {
+  private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp) throws IOException {
     DataMovementType dataMovementType;
     Class logicalInputClass;
     Class logicalOutputClass;
 
+    EdgeProperty edgeProperty = null;
+    EdgeType edgeType = edgeProp.getEdgeType();
     switch (edgeType) {
-    case BROADCAST_EDGE:
-      dataMovementType = DataMovementType.BROADCAST;
-      logicalOutputClass = OnFileUnorderedKVOutput.class;
-      logicalInputClass = ShuffledUnorderedKVInput.class;
-      break;
-
-    case SIMPLE_EDGE:
-    default:
-      dataMovementType = DataMovementType.SCATTER_GATHER;
-      logicalOutputClass = OnFileSortedOutput.class;
-      logicalInputClass = ShuffledMergedInputLegacy.class;
-      break;
+      case BROADCAST_EDGE:
+        dataMovementType = DataMovementType.BROADCAST;
+        logicalOutputClass = OnFileUnorderedKVOutput.class;
+        logicalInputClass = ShuffledUnorderedKVInput.class;
+        break;
+
+      case CUSTOM_EDGE:
+        
+        dataMovementType = DataMovementType.CUSTOM;
+        logicalOutputClass = OnFileSortedOutput.class;
+        logicalInputClass = ShuffledUnorderedKVInput.class;
+        EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
+            CustomPartitionEdge.class.getName());
+        CustomEdgeConfiguration edgeConf = 
+            new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
+          DataOutputBuffer dob = new DataOutputBuffer();
+          edgeConf.write(dob);
+          byte[] userPayload = dob.getData();
+        edgeDesc.setUserPayload(userPayload);
+        edgeProperty =
+          new EdgeProperty(edgeDesc,
+              DataSourceType.PERSISTED,
+              SchedulingType.SEQUENTIAL,
+              new OutputDescriptor(logicalOutputClass.getName()),
+              new InputDescriptor(logicalInputClass.getName()));
+        break;
+
+      case CUSTOM_SIMPLE_EDGE:
+        dataMovementType = DataMovementType.SCATTER_GATHER;
+        logicalOutputClass = OnFileSortedOutput.class;
+        logicalInputClass = ShuffledUnorderedKVInput.class;
+        break;
+
+      case SIMPLE_EDGE:
+      default:
+        dataMovementType = DataMovementType.SCATTER_GATHER;
+        logicalOutputClass = OnFileSortedOutput.class;
+        logicalInputClass = ShuffledMergedInputLegacy.class;
+        break;
     }
 
-    EdgeProperty edgeProperty =
+    if (edgeProperty == null) {
+      edgeProperty =
         new EdgeProperty(dataMovementType,
             DataSourceType.PERSISTED,
             SchedulingType.SEQUENTIAL,
             new OutputDescriptor(logicalOutputClass.getName()),
             new InputDescriptor(logicalInputClass.getName()));
+    }
 
     return edgeProperty;
   }
@@ -305,7 +366,7 @@ public class DagUtils {
       HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
       conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
     int cpus = conf.getInt(MRJobConfig.MAP_CPU_VCORES,
-                           MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+        MRJobConfig.DEFAULT_MAP_CPU_VCORES);
     return Resource.newInstance(memory, cpus);
   }
 
@@ -328,7 +389,7 @@ public class DagUtils {
    */
   private Vertex createVertex(JobConf conf, MapWork mapWork,
       LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
-      Path mrScratchDir, Context ctx) throws Exception {
+      Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception {
 
     Path tezDir = getTezDir(mrScratchDir);
 
@@ -353,11 +414,30 @@ public class DagUtils {
     Class inputFormatClass = conf.getClass("mapred.input.format.class",
         InputFormat.class);
 
-    // we'll set up tez to combine spits for us iff the input format
-    // is HiveInputFormat
-    if (inputFormatClass == HiveInputFormat.class) {
-      useTezGroupedSplits = true;
-      conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class);
+    boolean vertexHasCustomInput = false;
+    if (tezWork != null) {
+      for (BaseWork baseWork : tezWork.getParents(mapWork)) {
+        if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) {
+          vertexHasCustomInput = true;
+        }
+      }
+    }
+    if (vertexHasCustomInput) {
+      useTezGroupedSplits = false;
+      // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat 
+      // here would cause pre-mature grouping which would be incorrect.
+      inputFormatClass = HiveInputFormat.class;
+      conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
+      // mapreduce.tez.input.initializer.serialize.event.payload should be set to false when using
+      // this plug-in to avoid getting a serialized event at run-time.
+      conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false);
+    } else {
+      // we'll set up tez to combine spits for us iff the input format
+      // is HiveInputFormat
+      if (inputFormatClass == HiveInputFormat.class) {
+        useTezGroupedSplits = true;
+        conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class);
+      }
     }
 
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)) {
@@ -374,7 +454,7 @@ public class DagUtils {
     byte[] serializedConf = MRHelpers.createUserPayloadFromConf(conf);
     map = new Vertex(mapWork.getName(),
         new ProcessorDescriptor(MapTezProcessor.class.getName()).
-             setUserPayload(serializedConf), numTasks, getContainerResource(conf));
+        setUserPayload(serializedConf), numTasks, getContainerResource(conf));
     Map<String, String> environment = new HashMap<String, String>();
     MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
     map.setTaskEnvironment(environment);
@@ -393,7 +473,7 @@ public class DagUtils {
     }
     map.addInput(alias,
         new InputDescriptor(MRInputLegacy.class.getName()).
-               setUserPayload(mrInput), amSplitGeneratorClass);
+        setUserPayload(mrInput), amSplitGeneratorClass);
 
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
     localResources.put(getBaseName(appJarLr), appJarLr);
@@ -447,7 +527,7 @@ public class DagUtils {
     // create the vertex
     Vertex reducer = new Vertex(reduceWork.getName(),
         new ProcessorDescriptor(ReduceTezProcessor.class.getName()).
-             setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
+        setUserPayload(MRHelpers.createUserPayloadFromConf(conf)),
         reduceWork.getNumReduceTasks(), getContainerResource(conf));
 
     Map<String, String> environment = new HashMap<String, String>();
@@ -501,7 +581,7 @@ public class DagUtils {
    * @return prewarm context object
    */
   public PreWarmContext createPreWarmContext(TezSessionConfiguration sessionConfig, int numContainers,
-               Map<String, LocalResource> localResources) throws IOException, TezException {
+      Map<String, LocalResource> localResources) throws IOException, TezException {
 
     Configuration conf = sessionConfig.getTezConfiguration();
 
@@ -524,7 +604,7 @@ public class DagUtils {
     }
 
     if(localResources != null) {
-       combinedResources.putAll(localResources);
+      combinedResources.putAll(localResources);
     }
 
     context.setLocalResources(combinedResources);
@@ -641,7 +721,7 @@ public class DagUtils {
 
   // the api that finds the jar being used by this class on disk
   public String getExecJarPathLocal () throws URISyntaxException {
-      // returns the location on disc of the jar of this class.
+    // returns the location on disc of the jar of this class.
     return DagUtils.class.getProtectionDomain().getCodeSource().getLocation().toURI().toString();
   }
 
@@ -670,7 +750,7 @@ public class DagUtils {
    * @throws IOException when any file system related call fails
    */
   private boolean checkPreExisting(Path src, Path dest, Configuration conf)
-      throws IOException {
+    throws IOException {
     FileSystem destFS = dest.getFileSystem(conf);
 
     if (!destFS.exists(dest)) {
@@ -699,7 +779,7 @@ public class DagUtils {
    * @throws IOException when any file system related calls fails.
    */
   public LocalResource localizeResource(Path src, Path dest, Configuration conf)
-      throws IOException {
+    throws IOException {
     FileSystem destFS = dest.getFileSystem(conf);
     if (!(destFS instanceof DistributedFileSystem)) {
       throw new IOException(ErrorMsg.INVALID_HDFS_URI.format(dest.toString()));
@@ -776,6 +856,7 @@ public class DagUtils {
    * @param work The instance of BaseWork representing the actual work to be performed
    * by this vertex.
    * @param scratchDir HDFS scratch dir for this execution unit.
+   * @param list 
    * @param appJarLr Local resource for hive-exec.
    * @param additionalLr
    * @param fileSystem FS corresponding to scratchDir and LocalResources
@@ -783,15 +864,16 @@ public class DagUtils {
    * @return Vertex
    */
   public Vertex createVertex(JobConf conf, BaseWork work,
-      Path scratchDir, LocalResource appJarLr, List<LocalResource> additionalLr,
-      FileSystem fileSystem, Context ctx, boolean hasChildren) throws Exception {
+      Path scratchDir, LocalResource appJarLr, 
+      List<LocalResource> additionalLr,
+      FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception {
 
     Vertex v = null;
     // simply dispatch the call to the right method for the actual (sub-) type of
     // BaseWork.
     if (work instanceof MapWork) {
       v = createVertex(conf, (MapWork) work, appJarLr,
-          additionalLr, fileSystem, scratchDir, ctx);
+          additionalLr, fileSystem, scratchDir, ctx, tezWork);
     } else if (work instanceof ReduceWork) {
       v = createVertex(conf, (ReduceWork) work, appJarLr,
           additionalLr, fileSystem, scratchDir, ctx);
@@ -820,7 +902,7 @@ public class DagUtils {
     if (!hasChildren) {
       v.addOutput("out_"+work.getName(),
           new OutputDescriptor(MROutput.class.getName())
-               .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
+          .setUserPayload(MRHelpers.createUserPayloadFromConf(conf)));
     }
 
     return v;
@@ -842,7 +924,7 @@ public class DagUtils {
    * be used with Tez. Assumes scratchDir exists.
    */
   public Path createTezDir(Path scratchDir, Configuration conf)
-      throws IOException {
+    throws IOException {
     Path tezDir = getTezDir(scratchDir);
     FileSystem fs = tezDir.getFileSystem(conf);
     fs.mkdirs(tezDir);

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Fri Mar 28 05:53:12 2014
@@ -49,7 +49,7 @@ import org.apache.tez.runtime.library.ap
  */
 public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTableLoader {
 
-  private static final Log LOG = LogFactory.getLog(MapJoinOperator.class.getName());
+  private static final Log LOG = LogFactory.getLog(HashTableLoader.class.getName());
 
   private ExecMapperContext context;
   private Configuration hconf;
@@ -122,8 +122,13 @@ public class HashTableLoader implements 
         throw new HiveException(e);
       }
       // Register that the Input has been cached.
-      tezCacheAccess.registerCachedInput(inputName);
-      LOG.info("Setting Input: " + inputName + " as cached");
+      LOG.info("Is this a bucket map join: " + desc.isBucketMapJoin());
+      // cache is disabled for bucket map join because of the same reason
+      // given in loadHashTable in MapJoinOperator.
+      if (!desc.isBucketMapJoin()) {
+        tezCacheAccess.registerCachedInput(inputName);
+        LOG.info("Setting Input: " + inputName + " as cached");
+      }
     }
     if (lastKey == null) {
       lastKey = new MapJoinKeyObject(); // No rows in tables, the key type doesn't matter.

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java Fri Mar 28 05:53:12 2014
@@ -149,6 +149,7 @@ public class TezProcessor implements Log
       // Start the actual Inputs. After MRInput initialization.
       for (Entry<String, LogicalInput> inputEntry : inputs.entrySet()) {
         if (!cacheAccess.isInputCached(inputEntry.getKey())) {
+          LOG.info("Input: " + inputEntry.getKey() + " is not cached");
           inputEntry.getValue().start();
         } else {
           LOG.info("Input: " + inputEntry.getKey() + " is already cached. Skipping start");

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Fri Mar 28 05:53:12 2014
@@ -42,9 +42,10 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.JobConf;
@@ -222,7 +223,7 @@ public class TezTask extends Task<TezWor
         // split the children into vertices that make up the union and vertices that are
         // proper children of the union
         for (BaseWork v: work.getChildren(w)) {
-          EdgeType type = work.getEdgeProperty(w, v);
+          EdgeType type = work.getEdgeProperty(w, v).getEdgeType();
           if (type == EdgeType.CONTAINS) {
             unionWorkItems.add(v);
           } else {
@@ -257,7 +258,7 @@ public class TezTask extends Task<TezWor
         // Regular vertices
         JobConf wxConf = utils.initializeVertexConf(conf, w);
         Vertex wx = utils.createVertex(wxConf, w, tezDir, appJarLr, 
-          additionalLr, fs, ctx, !isFinal);
+          additionalLr, fs, ctx, !isFinal, work);
         dag.addVertex(wx);
         utils.addCredentials(w, dag);
         perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
@@ -269,9 +270,9 @@ public class TezTask extends Task<TezWor
           assert workToVertex.containsKey(v);
           Edge e = null;
 
-          EdgeType edgeType = work.getEdgeProperty(w, v);
-          
-          e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeType);
+          TezEdgeProperty edgeProp = work.getEdgeProperty(w, v);
+
+          e = utils.createEdge(wxConf, wx, workToConf.get(v), workToVertex.get(v), edgeProp);
           dag.addEdge(e);
         }
       }

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/io/DefaultHivePartitioner.java Fri Mar 28 05:53:12 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.io;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.lib.HashPartitioner;
 
 /** Partition keys by their {@link Object#hashCode()}. */

Added: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java?rev=1582613&view=auto
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java (added)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/lib/ForwardWalker.java Fri Mar 28 05:53:12 2014
@@ -0,0 +1,70 @@
+package org.apache.hadoop.hive.ql.lib;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+public class ForwardWalker extends DefaultGraphWalker {
+
+  /**
+* Constructor.
+*
+* @param disp
+* dispatcher to call for each op encountered
+*/
+  public ForwardWalker(Dispatcher disp) {
+    super(disp);
+  }
+
+  @SuppressWarnings("unchecked")
+  protected boolean allParentsDispatched(Node nd) {
+    Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+    if (op.getParentOperators() == null) {
+      return true;
+    }
+    for (Node pNode : op.getParentOperators()) {
+      if (!getDispatchedList().contains(pNode)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected void addAllParents(Node nd) {
+    Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+    if (op.getParentOperators() == null) {
+      return;
+    }
+    getToWalk().removeAll(op.getParentOperators());
+    getToWalk().addAll(0, op.getParentOperators());
+  }
+
+  /**
+* walk the current operator and its descendants.
+*
+* @param nd
+* current operator in the graph
+* @throws SemanticException
+*/
+  public void walk(Node nd) throws SemanticException {
+    if (opStack.empty() || nd != opStack.peek()) {
+      opStack.push(nd);
+    }
+    if (allParentsDispatched(nd)) {
+      // all children are done or no need to walk the children
+      if (!getDispatchedList().contains(nd)) {
+        getToWalk().addAll(nd.getChildren());
+        dispatch(nd, opStack);
+      }
+      opStack.pop();
+      return;
+    }
+    // add children, self to the front of the queue in that order
+    getToWalk().add(0, nd);
+    addAllParents(nd);
+  }
+}
\ No newline at end of file

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractBucketJoinProc.java Fri Mar 28 05:53:12 2014
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Stack;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -75,7 +74,7 @@ abstract public class AbstractBucketJoin
   abstract public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
       Object... nodeOutputs) throws SemanticException;
 
-  private static List<String> getBucketFilePathsOfPartition(
+  public static List<String> getBucketFilePathsOfPartition(
       Path location, ParseContext pGraphContext) throws SemanticException {
     List<String> fileNames = new ArrayList<String>();
     try {
@@ -134,7 +133,7 @@ abstract public class AbstractBucketJoin
       ParseContext pGraphContext,
       BucketJoinProcCtx context) throws SemanticException {
 
-    QBJoinTree joinCtx = this.pGraphContext.getMapJoinContext().get(mapJoinOp);
+    QBJoinTree joinCtx = pGraphContext.getMapJoinContext().get(mapJoinOp);
     if (joinCtx == null) {
       return false;
     }
@@ -454,7 +453,7 @@ abstract public class AbstractBucketJoin
     return converted;
   }
 
-  public List<String> toColumns(List<ExprNodeDesc> keys) {
+  public static List<String> toColumns(List<ExprNodeDesc> keys) {
     List<String> columns = new ArrayList<String>();
     for (ExprNodeDesc key : keys) {
       if (!(key instanceof ExprNodeColumnDesc)) {

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Fri Mar 28 05:53:12 2014
@@ -18,23 +18,31 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MuxOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.parse.OptimizeTezProcContext;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
 
@@ -50,15 +58,15 @@ public class ConvertJoinMapJoin implemen
   static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
 
   @Override
-  /*
-   * (non-Javadoc)
-   * we should ideally not modify the tree we traverse.
-   * However, since we need to walk the tree at any time when we modify the
-   * operator, we might as well do it here.
-   */
-  public Object process(Node nd, Stack<Node> stack,
-      NodeProcessorCtx procCtx, Object... nodeOutputs)
-      throws SemanticException {
+    /*
+     * (non-Javadoc)
+     * we should ideally not modify the tree we traverse.
+     * However, since we need to walk the tree at any time when we modify the
+     * operator, we might as well do it here.
+     */
+    public Object process(Node nd, Stack<Node> stack,
+        NodeProcessorCtx procCtx, Object... nodeOutputs)
+    throws SemanticException {
 
     OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
 
@@ -67,12 +75,178 @@ public class ConvertJoinMapJoin implemen
     }
 
     JoinOperator joinOp = (JoinOperator) nd;
+    // if we have traits, and table info is present in the traits, we know the 
+    // exact number of buckets. Else choose the largest number of estimated
+    // reducers from the parent operators.
+    int numBuckets = -1;
+    int estimatedBuckets = -1;
+    for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
+      if (parentOp.getOpTraits().getNumBuckets() > 0) {
+        numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? 
+            parentOp.getOpTraits().getNumBuckets() : numBuckets; 
+      }
+      ReduceSinkOperator rs = (ReduceSinkOperator)parentOp;
+      estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? 
+          rs.getConf().getNumReducers() : estimatedBuckets;
+    }
+
+    if (numBuckets <= 0) {
+      numBuckets = estimatedBuckets;
+      if (numBuckets <= 0) {
+        numBuckets = 1;
+      }
+    }
+    LOG.info("Estimated number of buckets " + numBuckets);
+    int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets);
+    if (mapJoinConversionPos < 0) {
+      return null;
+    }
+
+    if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
+      if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) {
+        return null;
+      }
+    }
+
+    LOG.info("Convert to non-bucketed map join");
+    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
+    // map join operator by default has no bucket cols
+    mapJoinOp.setOpTraits(new OpTraits(null, -1));
+    // propagate this change till the next RS
+    for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
+      setAllChildrenTraitsToNull(childOp);
+    }
+
+    return null;
+  }
+
+  private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) {
+    if (currentOp instanceof ReduceSinkOperator) {
+      return;
+    }
+    currentOp.setOpTraits(new OpTraits(null, -1));
+    for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
+      if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
+        break;
+      }
+      setAllChildrenTraitsToNull(childOp);
+    }
+  }
+
+  private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, 
+      int bigTablePosition) throws SemanticException {
+
+    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
+
+    if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) {
+      LOG.info("Check conversion to bucket map join failed.");
+      return false;
+    }
+
+    MapJoinOperator mapJoinOp = 
+      convertJoinMapJoin(joinOp, context, bigTablePosition);
+    MapJoinDesc joinDesc = mapJoinOp.getConf();
+    joinDesc.setBucketMapJoin(true);
+
+    // we can set the traits for this join operator
+    OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
+        tezBucketJoinProcCtx.getNumBuckets());
+    mapJoinOp.setOpTraits(opTraits);
+    setNumberOfBucketsOnChildren(mapJoinOp);
+
+    // Once the conversion is done, we can set the partitioner to bucket cols on the small table    
+    Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>();
+    bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets());
+    joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping);
+    LOG.info("Setting legacy map join to " + (!tezBucketJoinProcCtx.isSubQuery()));
+    joinDesc.setCustomBucketMapJoin(!tezBucketJoinProcCtx.isSubQuery());
+
+    return true;
+  }
+
+  private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
+    int numBuckets = currentOp.getOpTraits().getNumBuckets();
+    for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
+      if (!(op instanceof ReduceSinkOperator) && !(op instanceof GroupByOperator)) {
+        op.getOpTraits().setNumBuckets(numBuckets);
+        setNumberOfBucketsOnChildren(op);
+      }
+    }
+  }
 
+  /*
+   *  We perform the following checks to see if we can convert to a bucket map join
+   *  1. If the parent reduce sink of the big table side has the same emit key cols as 
+   *  its parent, we can create a bucket map join eliminating the reduce sink.
+   *  2. If we have the table information, we can check the same way as in Mapreduce to 
+   *  determine if we can perform a Bucket Map Join.
+   */
+  private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, 
+      OptimizeTezProcContext context, int bigTablePosition, 
+      TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+    // bail on mux-operator because mux operator masks the emit keys of the
+    // constituent reduce sinks
+    if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) {
+      LOG.info("Operator is " + joinOp.getParentOperators().get(0).getName() +
+          ". Cannot convert to bucket map join");
+      return false;
+    }
+
+    ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+    /*
+     * this is the case when the big table is a sub-query and is probably
+     * already bucketed by the join column in say a group by operation 
+     */
+    List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames();
+    if ((colNames != null) && (colNames.isEmpty() == false)) {
+      Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0);
+      for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) {
+        // can happen if this operator does not carry forward the previous bucketing columns
+        // for e.g. another join operator which does not carry one of the sides' key columns
+        if (listBucketCols.isEmpty()) {
+          continue;
+        }
+        int colCount = 0;
+        // parent op is guaranteed to have a single list because it is a reduce sink
+        for (String colName : rs.getOpTraits().getBucketColNames().get(0)) {
+          // all columns need to be at least a subset of the parentOfParent's bucket cols
+          ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName);
+          if (exprNodeDesc instanceof ExprNodeColumnDesc) {
+            if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) {
+              colCount++;
+            } else {
+              break;
+            }
+          }
+          
+          if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) {
+            // all keys matched.
+            int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
+            boolean isSubQuery = false;
+            if (numBuckets < 0) {
+              isSubQuery = true;
+              numBuckets = rs.getConf().getNumReducers();
+            }
+            tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+            tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+            return true;
+          }
+        }
+      }
+      return false;
+    }
+
+    LOG.info("No info available to check for bucket map join. Cannot convert");
+    return false;
+  }
+
+  public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, 
+      int buckets) {
     Set<Integer> bigTableCandidateSet = MapJoinProcessor.
       getBigTableCandidates(joinOp.getConf().getConds());
 
     long maxSize = context.conf.getLongVar(
-      HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
+        HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
 
     int bigTablePosition = -1;
 
@@ -89,7 +263,7 @@ public class ConvertJoinMapJoin implemen
       Statistics currInputStat = parentOp.getStatistics();
       if (currInputStat == null) {
         LOG.warn("Couldn't get statistics from: "+parentOp);
-        return null;
+        return -1;
       }
 
       long inputSize = currInputStat.getDataSize();
@@ -100,14 +274,14 @@ public class ConvertJoinMapJoin implemen
         if (bigTableFound) {
           // cannot convert to map join; we've already chosen a big table
           // on size and there's another one that's bigger.
-          return null;
+          return -1;
         }
 
-        if (inputSize > maxSize) {
+        if (inputSize/buckets > maxSize) {
           if (!bigTableCandidateSet.contains(pos)) {
             // can't use the current table as the big table, but it's too
             // big for the map side.
-            return null;
+            return -1;
           }
 
           bigTableFound = true;
@@ -119,10 +293,10 @@ public class ConvertJoinMapJoin implemen
           totalSize += bigInputStat.getDataSize();
         }
 
-        if (totalSize > maxSize) {
+        if (totalSize/buckets > maxSize) {
           // sum of small tables size in this join exceeds configured limit
           // hence cannot convert.
-          return null;
+          return -1;
         }
 
         if (bigTableCandidateSet.contains(pos)) {
@@ -131,37 +305,45 @@ public class ConvertJoinMapJoin implemen
         }
       } else {
         totalSize += currInputStat.getDataSize();
-        if (totalSize > maxSize) {
+        if (totalSize/buckets > maxSize) {
           // cannot hold all map tables in memory. Cannot convert.
-          return null;
+          return -1;
         }
       }
       pos++;
     }
 
-    if (bigTablePosition == -1) {
-      // all tables have size 0. We let the shuffle join handle this case.
-      return null;
-    }
+    return bigTablePosition;
+  }
 
-    /*
-     * Once we have decided on the map join, the tree would transform from
-     *
-     *        |                   |
-     *       Join               MapJoin
-     *       / \                /   \
-     *      RS RS   --->      RS    TS (big table)
-     *      /   \            /
-     *    TS     TS         TS (small table)
-     *
-     * for tez.
-     */
+  /*
+   * Once we have decided on the map join, the tree would transform from
+   *
+   *        |                   |
+   *       Join               MapJoin
+   *       / \                /   \
+   *     RS   RS   --->     RS    TS (big table)
+   *    /      \           /
+   *   TS       TS        TS (small table)
+   *
+   * for tez.
+   */
+
+  public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, 
+      int bigTablePosition) throws SemanticException {
+    // bail on mux operator because currently the mux operator masks the emit keys 
+    // of the constituent reduce sinks.
+    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+      if (parentOp instanceof MuxOperator) {
+        return null;
+      }
+    }
 
-    // convert to a map join operator with this information
+    //can safely convert the join to a map join.
     ParseContext parseContext = context.parseContext;
     MapJoinOperator mapJoinOp = MapJoinProcessor.
       convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(),
-      joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+          joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
 
     Operator<? extends OperatorDesc> parentBigTableOp
       = mapJoinOp.getParentOperators().get(bigTablePosition);
@@ -169,10 +351,10 @@ public class ConvertJoinMapJoin implemen
     if (parentBigTableOp instanceof ReduceSinkOperator) {
       mapJoinOp.getParentOperators().remove(bigTablePosition);
       if (!(mapJoinOp.getParentOperators().contains(
-          parentBigTableOp.getParentOperators().get(0)))) {
+              parentBigTableOp.getParentOperators().get(0)))) {
         mapJoinOp.getParentOperators().add(bigTablePosition,
-          parentBigTableOp.getParentOperators().get(0));
-      }
+            parentBigTableOp.getParentOperators().get(0));
+              }
       parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
       for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) {
         if (!(op.getChildOperators().contains(mapJoinOp))) {
@@ -182,6 +364,6 @@ public class ConvertJoinMapJoin implemen
       }
     }
 
-    return null;
+    return mapJoinOp;
   }
 }

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Mar 28 05:53:12 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.index.RewriteGBUsingIndex;
 import org.apache.hadoop.hive.ql.optimizer.lineage.Generator;
 import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPruner;
+import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
 import org.apache.hadoop.hive.ql.optimizer.pcr.PartitionConditionRemover;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
 import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
@@ -124,7 +125,9 @@ public class Optimizer {
     if (pctx.getContext().getExplain() ||
         HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       transformations.add(new AnnotateWithStatistics());
+      transformations.add(new AnnotateWithOpTraits());
     }
+
     transformations.add(new SimpleFetchOptimizer());  // must be called last
 
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEFETCHTASKAGGR)) {

Modified: hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1582613&r1=1582612&r2=1582613&view=diff
==============================================================================
--- hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/branch-0.13/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Fri Mar 28 05:53:12 2014
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.optimizer;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
@@ -44,8 +45,9 @@ import org.apache.hadoop.hive.ql.plan.Op
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
 
 public class ReduceSinkMapJoinProc implements NodeProcessor {
 
@@ -110,12 +112,24 @@ public class ReduceSinkMapJoinProc imple
     LOG.debug("Mapjoin "+mapJoinOp+", pos: "+pos+" --> "+parentWork.getName());
     mapJoinOp.getConf().getParentToInput().put(pos, parentWork.getName());
 
+    int numBuckets = -1;
+    EdgeType edgeType = EdgeType.BROADCAST_EDGE;
+    if (mapJoinOp.getConf().isBucketMapJoin()) {
+      numBuckets = (Integer) mapJoinOp.getConf().getBigTableBucketNumMapping().values().toArray()[0];
+      if (mapJoinOp.getConf().getCustomBucketMapJoin()) {
+        edgeType = EdgeType.CUSTOM_EDGE;
+      } else {
+        edgeType = EdgeType.CUSTOM_SIMPLE_EDGE;
+      }
+    }
+    TezEdgeProperty edgeProp = new TezEdgeProperty(null, edgeType, numBuckets);
+
     if (mapJoinWork != null) {
       for (BaseWork myWork: mapJoinWork) {
         // link the work with the work associated with the reduce sink that triggered this rule
         TezWork tezWork = context.currentTask.getWork();
         LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
-        tezWork.connect(parentWork, myWork, EdgeType.BROADCAST_EDGE);
+        tezWork.connect(parentWork, myWork, edgeProp);
         
         ReduceSinkOperator r = null;
         if (parentRS.getConf().getOutputName() != null) {
@@ -134,12 +148,14 @@ public class ReduceSinkMapJoinProc imple
     }
 
     // remember in case we need to connect additional work later
-    List<BaseWork> linkWorkList = context.linkOpWithWorkMap.get(mapJoinOp);
-    if (linkWorkList == null) {
-      linkWorkList = new ArrayList<BaseWork>();
+    Map<BaseWork, TezEdgeProperty> linkWorkMap = null;
+    if (context.linkOpWithWorkMap.containsKey(mapJoinOp)) {
+      linkWorkMap = context.linkOpWithWorkMap.get(mapJoinOp);
+    } else {
+      linkWorkMap = new HashMap<BaseWork, TezEdgeProperty>();
     }
-    linkWorkList.add(parentWork);
-    context.linkOpWithWorkMap.put(mapJoinOp, linkWorkList);
+    linkWorkMap.put(parentWork, edgeProp);
+    context.linkOpWithWorkMap.put(mapJoinOp, linkWorkMap);
     
     List<ReduceSinkOperator> reduceSinks 
       = context.linkWorkWithReduceSinkMap.get(parentWork);



Mime
View raw message