pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1738662 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/ src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/
Date Mon, 11 Apr 2016 23:00:05 GMT
Author: rohini
Date: Mon Apr 11 23:00:05 2016
New Revision: 1738662

URL: http://svn.apache.org/viewvc?rev=1738662&view=rev
Log:
PIG-4853: Fetch inputs before starting outputs

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 11 23:00:05 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4853: Fetch inputs before starting outputs (rohini)
+
 PIG-4847: POPartialAgg processing and spill improvements (rohini)
 
 PIG-4840: Do not turn off UnionOptimizer for unsupported storefuncs in case of no vertex
groups (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
Mon Apr 11 23:00:05 2016
@@ -56,6 +56,7 @@ public class POCounterStatsTez extends P
     private transient KeyValuesReader reader;
     private transient KeyValueWriter writer;
     private transient boolean finished = false;
+    private transient boolean hasNext = false;
 
     public POCounterStatsTez(OperatorKey k) {
         super(k);
@@ -88,6 +89,7 @@ public class POCounterStatsTez extends P
         try {
             reader = (KeyValuesReader) input.getReader();
             LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ",
reader=" + reader);
+            hasNext = reader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -130,12 +132,13 @@ public class POCounterStatsTez extends P
             Integer key = null;
             Long value = null;
             // Read count of records per task
-            while (reader.next()) {
+            while (hasNext) {
                 key = ((IntWritable)reader.getCurrentKey()).get();
                 for (Object val : reader.getCurrentValues()) {
                     value = ((LongWritable)val).get();
                     counterRecords.put(key, value);
                 }
+                hasNext = reader.next();
             }
 
             // BinInterSedes only takes String for map key

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
Mon Apr 11 23:00:05 2016
@@ -101,9 +101,13 @@ public class POFRJoinTez extends POFRJoi
                 LogicalInput input = inputs.get(key);
                 if (!this.replInputs.contains(input)) {
                     this.replInputs.add(input);
-                    this.replReaders.add((KeyValueReader) input.getReader());
+                    KeyValueReader reader = (KeyValueReader) input.getReader();
+                    this.replReaders.add(reader);
+                    log.info("Attached input from vertex " + key + " : input=" + input +
", reader=" + reader);
                 }
             }
+            // Do not force fetch input by reading first record. Cases like MultiQuery_Union_4
have
+            // multiple POFRJoinTez loading same replicate input and will skip records
         } catch (Exception e) {
             throw new ExecException(e);
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
Mon Apr 11 23:00:05 2016
@@ -57,6 +57,7 @@ public class POIdentityInOutTez extends
     private transient KeyValuesReader shuffleReader;
     private transient boolean shuffleInput;
     private transient boolean finished = false;
+    private transient boolean hasNext = false;
 
     public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, String inputKey)
{
         super(inputRearrange);
@@ -95,9 +96,12 @@ public class POIdentityInOutTez extends
             Reader r = input.getReader();
             if (r instanceof KeyValueReader) {
                 reader = (KeyValueReader) r;
+                // Force input fetch
+                hasNext = reader.next();
             } else {
                 shuffleInput = true;
                 shuffleReader = (KeyValuesReader) r;
+                hasNext = shuffleReader.next();
             }
             LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ",
reader=" + r);
         } catch (Exception e) {
@@ -127,7 +131,7 @@ public class POIdentityInOutTez extends
                 return RESULT_EOP;
             }
             if (shuffleInput) {
-                while (shuffleReader.next()) {
+                while (hasNext) {
                     Object curKey = shuffleReader.getCurrentKey();
                     Iterable<Object> vals = shuffleReader.getCurrentValues();
                     if (isSkewedJoin) {
@@ -139,9 +143,10 @@ public class POIdentityInOutTez extends
                     for (Object val : vals) {
                         writer.write(curKey, val);
                     }
+                    hasNext = shuffleReader.next();
                 }
             } else {
-                while (reader.next()) {
+                while (hasNext) {
                     if (isSkewedJoin) {
                         NullablePartitionWritable wrappedKey = new NullablePartitionWritable(
                                 (PigNullableWritable) reader.getCurrentKey());
@@ -155,6 +160,7 @@ public class POIdentityInOutTez extends
                         writer.write(reader.getCurrentKey(),
                                 reader.getCurrentValue());
                     }
+                    hasNext = reader.next();
                 }
             }
             finished = true;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
Mon Apr 11 23:00:05 2016
@@ -51,6 +51,7 @@ public class PORankTez extends PORank im
     private transient Map<Integer, Long> counterOffsets;
     private transient Configuration conf;
     private transient boolean finished = false;
+    private transient Boolean hasFirstRecord;
 
     public PORankTez(PORank copy) {
         super(copy);
@@ -100,6 +101,7 @@ public class PORankTez extends PORank im
         try {
             reader = (KeyValueReader) input.getReader();
             LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input
+ ", reader=" + reader);
+            hasFirstRecord = reader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -140,9 +142,18 @@ public class PORankTez extends PORank im
         Result inp = null;
 
         try {
-            while (reader.next()) {
-                inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
-                return addRank(inp);
+            if (hasFirstRecord != null) {
+                if (hasFirstRecord) {
+                    hasFirstRecord = null;
+                    inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+                    return addRank(inp);
+                }
+                hasFirstRecord = null;
+            } else {
+                while (reader.next()) {
+                    inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
+                    return addRank(inp);
+                }
             }
         } catch (IOException e) {
             throw new ExecException(e);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
Mon Apr 11 23:00:05 2016
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -48,6 +50,7 @@ import org.apache.tez.runtime.library.co
 public class POShuffleTezLoad extends POPackage implements TezInput {
 
     private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class);
 
     protected List<String> inputKeys = new ArrayList<String>();
     private boolean isSkewedJoin = false;
@@ -101,7 +104,10 @@ public class POShuffleTezLoad extends PO
                 //     - Input key will be repeated, but index would be same within a TezInput
                 if (!this.inputs.contains(input)) {
                     this.inputs.add(input);
-                    this.readers.add((KeyValuesReader)input.getReader());
+                    KeyValuesReader reader = (KeyValuesReader)input.getReader();
+                    this.readers.add(reader);
+                    LOG.info("Attached input from vertex " + inputKey
+                            + " : input=" + input + ", reader=" + reader);
                 }
             }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
Mon Apr 11 23:00:05 2016
@@ -57,6 +57,7 @@ public class POShuffledValueInputTez ext
     private transient Iterator<KeyValueReader> readers;
     private transient KeyValueReader currentReader;
     private transient Configuration conf;
+    private transient Boolean hasFirstRecord;
 
     public POShuffledValueInputTez(OperatorKey k) {
         super(k);
@@ -98,6 +99,8 @@ public class POShuffledValueInputTez ext
             }
             readers = readersList.iterator();
             currentReader = readers.next();
+            // Force input fetch
+            hasFirstRecord = currentReader.next();
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -111,7 +114,15 @@ public class POShuffledValueInputTez ext
             }
 
             do {
-                if (currentReader.next()) {
+                if (hasFirstRecord != null) {
+                    if (hasFirstRecord) {
+                        hasFirstRecord = null;
+                        Tuple origTuple = (Tuple) currentReader.getCurrentValue();
+                        Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                        return new Result(POStatus.STATUS_OK, copy);
+                    }
+                    hasFirstRecord = null;
+                } else if (currentReader.next()) {
                     Tuple origTuple = (Tuple) currentReader.getCurrentValue();
                     Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
                     return new Result(POStatus.STATUS_OK, copy);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
Mon Apr 11 23:00:05 2016
@@ -57,6 +57,7 @@ public class POValueInputTez extends Phy
     private transient KeyValuesReader shuffleReader;
     private transient boolean shuffleInput;
     private transient boolean hasNext;
+    private transient Boolean hasFirstRecord;
 
     public POValueInputTez(OperatorKey k) {
         super(k);
@@ -92,6 +93,8 @@ public class POValueInputTez extends Phy
             Reader r = input.getReader();
             if (r instanceof KeyValueReader) {
                 reader = (KeyValueReader) r;
+                // Force input fetch
+                hasFirstRecord = reader.next();
             } else {
                 shuffleInput = true;
                 shuffleReader = (KeyValuesReader) r;
@@ -118,10 +121,22 @@ public class POValueInputTez extends Phy
                     }
                     hasNext = shuffleReader.next();
                 }
-            } else if (reader.next()) {
-                Tuple origTuple = (Tuple) reader.getCurrentValue();
-                Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
-                return new Result(POStatus.STATUS_OK, copy);
+            } else {
+                if (hasFirstRecord != null) {
+                    if (hasFirstRecord) {
+                        hasFirstRecord = null;
+                        Tuple origTuple = (Tuple) reader.getCurrentValue();
+                        Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                        return new Result(POStatus.STATUS_OK, copy);
+                    }
+                    hasFirstRecord = null;
+                } else {
+                    while (reader.next()) {
+                        Tuple origTuple = (Tuple) reader.getCurrentValue();
+                        Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                        return new Result(POStatus.STATUS_OK, copy);
+                    }
+                }
             }
             finished = true;
             // For certain operators (such as STREAM), we could still have some work

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
Mon Apr 11 23:00:05 2016
@@ -43,6 +43,15 @@ public interface TezInput {
      */
     public void addInputsToSkip(Set<String> inputsToSkip);
 
+    /**
+     * Attach the inputs to the operator. Also ensure reader.next() is called to force fetch
+     * the input so that all inputs are fetched and memory released before memory is allocated
+     * for outputs
+     *
+     * @param inputs available inputs
+     * @param conf configuration
+     * @throws ExecException
+     */
     public void attachInputs(Map<String, LogicalInput> inputs,
             Configuration conf) throws ExecException;
 



Mime
View raw message