pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cheol...@apache.org
Subject svn commit: r1601782 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Date Tue, 10 Jun 2014 22:32:27 GMT
Author: cheolsoo
Date: Tue Jun 10 22:32:27 2014
New Revision: 1601782

URL: http://svn.apache.org/r1601782
Log:
PIG-3959: Skewed join followed by replicated join fails in Tez (cheolsoo)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1601782&r1=1601781&r2=1601782&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jun 10 22:32:27 2014
@@ -32,6 +32,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3959: Skewed join followed by replicated join fails in Tez (cheolsoo)
+
 PIG-3995: Tez unit tests shouldn't run when -Dhadoopversion=20 (cheolsoo)
 
 PIG-3986: PigSplit to support multiple split class (tongjie via cheolsoo)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1601782&r1=1601781&r2=1601782&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue
Jun 10 22:32:27 2014
@@ -269,8 +269,8 @@ public class TezDagBuilder extends TezOp
         for (POLocalRearrangeTez lr : lrs) {
             if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
                 byte keyType = lr.getKeyType();
-                setIntermediateInputKeyValue(keyType, conf, to);
-                setIntermediateOutputKeyValue(keyType, conf, to);
+                setIntermediateInputKeyValue(keyType, conf, to, lr.isConnectedToPackage());
+                setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage());
                 // In case of secondary key sort, main key type is the actual key type
                 conf.set("pig.reduce.key.type", Byte.toString(lr.getMainKeyType()));
                 break;
@@ -801,12 +801,12 @@ public class TezDagBuilder extends TezOp
     }
 
     @SuppressWarnings("rawtypes")
-    private void setIntermediateInputKeyValue(byte keyType, Configuration conf, TezOperator
tezOp)
-            throws JobCreationException, ExecException {
-        if (tezOp != null && tezOp.isUseSecondaryKey()) {
+    private void setIntermediateInputKeyValue(byte keyType, Configuration conf, TezOperator
tezOp,
+            boolean isConnectedToPackage) throws JobCreationException, ExecException {
+        if (tezOp != null && tezOp.isUseSecondaryKey() && isConnectedToPackage)
{
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
                     NullableTuple.class.getName());
-        } else if (tezOp != null && tezOp.isSkewedJoin()) {
+        } else if (tezOp != null && tezOp.isSkewedJoin() && isConnectedToPackage)
{
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
                     NullablePartitionWritable.class.getName());
         } else {
@@ -821,12 +821,17 @@ public class TezDagBuilder extends TezOp
                 NullableTuple.class.getName());
     }
 
-    @SuppressWarnings("rawtypes")
-    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator
tezOp)
+    private void setIntermediateInputKeyValue(byte keyType, Configuration conf, TezOperator
tezOp)
             throws JobCreationException, ExecException {
+        setIntermediateInputKeyValue(keyType, conf, tezOp, true);
+    }
+
+    @SuppressWarnings("rawtypes")
+    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator
tezOp,
+            boolean isConnectedToPackage) throws JobCreationException, ExecException {
         Class<? extends WritableComparable> keyClass = HDataType
                 .getWritableComparableTypes(keyType).getClass();
-        if (tezOp != null && tezOp.isSkewedJoin()) {
+        if (tezOp != null && tezOp.isSkewedJoin() && isConnectedToPackage)
{
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
                     NullablePartitionWritable.class.getName());
         } else {
@@ -840,6 +845,11 @@ public class TezDagBuilder extends TezOp
         selectOutputComparator(keyType, conf, tezOp);
     }
 
+    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator
tezOp)
+            throws JobCreationException, ExecException {
+        setIntermediateOutputKeyValue(keyType, conf, tezOp, true);
+    }
+
     private static Class<? extends WritableComparator> comparatorForKeyType(byte keyType,
boolean hasOrderBy)
             throws JobCreationException {
 



Mime
View raw message