pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1733627 [5/18] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/ contrib/piggybank/java/src/main...
Date Fri, 04 Mar 2016 18:17:47 GMT
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Fri Mar  4 18:17:39 2016
@@ -41,7 +41,6 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.SelfSpillBag.MemoryLimits;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.GroupingSpillable;
@@ -68,14 +67,23 @@ public class POPartialAgg extends Physic
     // entry in hash map and average seen reduction
     private static final int NUM_RECS_TO_SAMPLE = 10000;
 
+    // We want to allow bigger list sizes for group all.
+    // But still have a cap on it to avoid JVM finding it hard to allocate space
+    // TODO: How high can we go without performance degradation??
+    private static final int MAX_LIST_SIZE = 25000;
+
     // We want to avoid massive ArrayList copies as they get big.
     // Array Lists grow by prevSize + prevSize/2. Given default initial size of 10,
     // 9369 is the size of the array after 18 such resizings. This seems like a sufficiently
     // large value to trigger spilling/aggregation instead of paying for yet another data
     // copy.
-    private static final int MAX_LIST_SIZE = 9368;
+    // For group all cases, we will set this to a higher value
+    private int listSizeThreshold = 9367;
 
-    private static final int DEFAULT_MIN_REDUCTION = 10;
+    // Using default min reduction 7 instead of 10 as processedInputMap size
+    // will be 4096 (hashmap size is power of 2) for both 20000/10 and 20000/7.
+    // So using the lower number 7 as even 7x reduction is worth using map side aggregation
+    private static final int DEFAULT_MIN_REDUCTION = 7;
 
     // TODO: these are temporary. The real thing should be using memory usage estimation.
     private static final int FIRST_TIER_THRESHOLD = 20000;
@@ -83,12 +91,11 @@ public class POPartialAgg extends Physic
 
     private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<POPartialAgg, Byte>();
 
-    private static final TupleFactory TF = TupleFactory.getInstance();
-
     private PhysicalPlan keyPlan;
     private ExpressionOperator keyLeaf;
     private List<PhysicalPlan> valuePlans;
     private List<ExpressionOperator> valueLeaves;
+    private boolean isGroupAll;
 
     private transient int numRecsInRawMap;
     private transient int numRecsInProcessedMap;
@@ -112,6 +119,7 @@ public class POPartialAgg extends Physic
     // rather than just spilling records to disk.
     private transient volatile boolean doSpill;
     private transient volatile boolean doContingentSpill;
+    private transient volatile boolean startedContingentSpill;
     private transient volatile Object spillLock;
 
     private transient int minOutputReduction;
@@ -123,17 +131,19 @@ public class POPartialAgg extends Physic
     private transient int avgTupleSize;
     private transient Iterator<Entry<Object, List<Tuple>>> spillingIterator;
 
-
     public POPartialAgg(OperatorKey k) {
+        this(k, false);
+    }
+
+    public POPartialAgg(OperatorKey k, boolean isGroupAll) {
         super(k);
+        this.isGroupAll = isGroupAll;
     }
 
     private void init() throws ExecException {
         ALL_POPARTS.put(this, null);
         numRecsInRawMap = 0;
         numRecsInProcessedMap = 0;
-        rawInputMap = Maps.newHashMap();
-        processedInputMap = Maps.newHashMap();
         minOutputReduction = DEFAULT_MIN_REDUCTION;
         numRecordsToSample = NUM_RECS_TO_SAMPLE;
         firstTierThreshold = FIRST_TIER_THRESHOLD;
@@ -158,11 +168,24 @@ public class POPartialAgg extends Physic
         }
         if (percentUsage <= 0) {
             LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation.");
-            disableMapAgg();
+            disableMapAgg = true;
             // Set them to true instead of adding another check for !disableMapAgg
             sizeReductionChecked = true;
             estimatedMemThresholds = true;
         }
+        // Avoid hashmap resizing. TODO: Investigate loadfactor of 0.90 or 1.0
+        // newHashMapWithExpectedSize does new HashMap(expectedSize + expectedSize/3)
+        // to factor in default load factor of 0.75.
+        // For Hashmap, internally its size is always in power of 2.
+        // So for NUM_RECS_TO_SAMPLE=10000, hashmap size will be 16384
+        // With secondTierThreshold of 2857 (minReduction 7), hashmap size will be 4096
+        if (!disableMapAgg) {
+            rawInputMap = Maps.newHashMapWithExpectedSize(NUM_RECS_TO_SAMPLE);
+            processedInputMap = Maps.newHashMapWithExpectedSize(SECOND_TIER_THRESHOLD);
+        }
+        if (isGroupAll) {
+            listSizeThreshold = Math.min(numRecordsToSample, MAX_LIST_SIZE);
+        }
         initialized = true;
         SpillableMemoryManager.getInstance().registerSpillable(this);
     }
@@ -196,6 +219,7 @@ public class POPartialAgg extends Physic
                 estimateMemThresholds();
             }
             if (doContingentSpill) {
+                startedContingentSpill = true;
                 // Don't aggregate if spilling. Avoid concurrent update of spilling iterator.
                 if (doSpill == false) {
                     // SpillableMemoryManager requested a spill to reduce memory
@@ -216,14 +240,17 @@ public class POPartialAgg extends Physic
                     doSpill = false;
                     doContingentSpill = false;
                 }
-                if (result.returnStatus != POStatus.STATUS_EOP
-                        || inputsExhausted) {
+                if (result.returnStatus != POStatus.STATUS_EOP) {
+                    return result;
+                } else if (inputsExhausted) {
+                    freeMemory();
                     return result;
                 }
             }
             if (mapAggDisabled()) {
                 // disableMapAgg() sets doSpill, so we can't get here while there is still contents in the buffered maps.
                 // if we get to this point, everything is flushed, so we can simply return the raw tuples from now on.
+                freeMemory();
                 return processInput();
             } else {
                 Result inp = processInput();
@@ -265,6 +292,15 @@ public class POPartialAgg extends Physic
         }
     }
 
+    private void freeMemory() throws ExecException {
+        if (rawInputMap != null && !rawInputMap.isEmpty()) {
+            throw new ExecException("Illegal state. Trying to free up partial aggregation maps when they are not empty");
+        }
+        // Free up the maps for garbage collection
+        rawInputMap = null;
+        processedInputMap = null;
+    }
+
     private void estimateMemThresholds() {
         if (!mapAggDisabled()) {
             LOG.info("Getting mem limits; considering " + ALL_POPARTS.size()
@@ -294,6 +330,9 @@ public class POPartialAgg extends Physic
                 secondTierThreshold += 1;
                 firstTierThreshold -= 1;
             }
+            if (isGroupAll) {
+                listSizeThreshold = Math.min(firstTierThreshold, MAX_LIST_SIZE);
+            }
         }
         estimatedMemThresholds = true;
     }
@@ -344,17 +383,26 @@ public class POPartialAgg extends Physic
             Object key, Tuple inpTuple) throws ExecException {
         List<Tuple> value = map.get(key);
         if (value == null) {
-            value = new ArrayList<Tuple>();
+            if (isGroupAll) {
+                // Set exact array initial size to avoid array copies
+                // listSizeThreshold = numRecordsToSample before estimating memory
+                // thresholds and firstTierThreshold after memory estimation
+                int listSize = (map == rawInputMap) ? listSizeThreshold : Math.min(secondTierThreshold, MAX_LIST_SIZE);
+                value = new ArrayList<Tuple>(listSize);
+            } else {
+                value = new ArrayList<Tuple>();
+            }
             map.put(key, value);
         }
         value.add(inpTuple);
-        if (value.size() >= MAX_LIST_SIZE) {
+        if (value.size() > listSizeThreshold) {
             boolean isFirst = (map == rawInputMap);
             if (LOG.isDebugEnabled()){
                 LOG.debug("The cache for key " + key + " has grown too large. Aggregating " + ((isFirst) ? "first level." : "second level."));
             }
             if (isFirst) {
-                aggregateRawRow(key);
+                // Aggregate and remove just this key to keep size in check
+                aggregateRawRow(key, value);
             } else {
                 aggregateSecondLevel();
             }
@@ -367,7 +415,7 @@ public class POPartialAgg extends Physic
 
         LOG.info("Starting spill.");
         if (aggregate) {
-            aggregateBothLevels(false, true);
+            aggregateBothLevels(false, false);
         }
         doSpill = true;
         spillingIterator = processedInputMap.entrySet().iterator();
@@ -389,8 +437,8 @@ public class POPartialAgg extends Physic
         }
     }
 
-    private void aggregateRawRow(Object key) throws ExecException {
-        List<Tuple> value = rawInputMap.get(key);
+    private void aggregateRawRow(Object key, List<Tuple> value) throws ExecException {
+        numRecsInRawMap -= value.size();
         Tuple valueTuple = createValueTuple(key, value);
         Result res = getOutput(key, valueTuple);
         rawInputMap.remove(key);
@@ -454,7 +502,7 @@ public class POPartialAgg extends Physic
     }
 
     private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws ExecException {
-        Tuple valueTuple = TF.newTuple(valuePlans.size() + 1);
+        Tuple valueTuple = mTupleFactory.newTuple(valuePlans.size() + 1);
         valueTuple.set(0, key);
 
         for (int i = 0; i < valuePlans.size(); i++) {
@@ -534,7 +582,7 @@ public class POPartialAgg extends Physic
      * @throws ExecException
      */
     private Result getOutput(Object key, Tuple value) throws ExecException {
-        Tuple output = TF.newTuple(valuePlans.size() + 1);
+        Tuple output = mTupleFactory.newTuple(valuePlans.size() + 1);
         output.set(0, key);
 
         for (int i = 0; i < valuePlans.size(); i++) {
@@ -591,21 +639,49 @@ public class POPartialAgg extends Physic
         if (mapAggDisabled()) {
             return 0;
         } else {
+            if (doContingentSpill && !startedContingentSpill) {
+                LOG.info("Spill triggered by SpillableMemoryManager, but previous spill call is still not processed. Skipping");
+                return 0;
+            }
             LOG.info("Spill triggered by SpillableMemoryManager");
-            doContingentSpill = true;
             synchronized(spillLock) {
-                if (!sizeReductionChecked) {
+                if (rawInputMap != null) {
+                    LOG.info("Memory usage: " + getMemorySize()
+                            + ". Raw map: num keys = " + rawInputMap.size()
+                            + ", num tuples = "+ numRecsInRawMap
+                            + ", Processed map: num keys = " + processedInputMap.size()
+                            + ", num tuples = "+ numRecsInProcessedMap );
+                }
+                startedContingentSpill = false;
+                doContingentSpill = true;
+                if (!sizeReductionChecked || !estimatedMemThresholds) {
                     numRecordsToSample = numRecsInRawMap;
                 }
                 try {
+                    // Block till spilling is finished. If main thread execution has not come to POPartialAgg
+                    // and is still processing lower pipeline for more than 5 seconds it means
+                    // jvm is stuck doing GC and will soon fail with java.lang.OutOfMemoryError: GC overhead limit exceeded
+                    // So exit out of here so that SpillableMemoryManger can at least spill
+                    // other Spillable bags and free up some memory for user code to be able to run
+                    // and reach POPartialAgg for the aggregation/spilling of the hashmaps to happen.
+                    long startTime = System.currentTimeMillis();
                     while (doContingentSpill == true) {
-                        Thread.sleep(50); //Keeping it on the lower side for now. Tune later
+                        Thread.sleep(25);
+                        if (!startedContingentSpill && (System.currentTimeMillis() - startTime) >= 5000) {
+                            break;
+                        }
+                    }
+                    if (doContingentSpill) {
+                        LOG.info("Not blocking for spill and letting SpillableMemoryManager"
+                                + " process other spillable objects as main thread has not reached here for 5 secs");
+                    } else {
+                        LOG.info("Finished spill for SpillableMemoryManager call");
+                        return 1;
                     }
                 } catch (InterruptedException e) {
                     LOG.warn("Interrupted exception while waiting for spill to finish", e);
                 }
-                LOG.info("Finished spill for SpillableMemoryManager call");
-                return 1;
+                return 0;
             }
         }
     }
@@ -615,4 +691,14 @@ public class POPartialAgg extends Physic
         return avgTupleSize * (numRecsInProcessedMap + numRecsInRawMap);
     }
 
+    @Override
+    public PhysicalOperator clone() throws CloneNotSupportedException {
+        POPartialAgg clone = (POPartialAgg) super.clone();
+        clone.setKeyPlan(keyPlan.clone());
+        clone.setValuePlans(clonePlans(valuePlans));
+        return clone;
+    }
+
+
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartitionRearrange.java Fri Mar  4 18:17:39 2016
@@ -30,7 +30,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -39,8 +38,6 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.Utils;
 
-import com.google.common.collect.Maps;
-
 /**
  * The partition rearrange operator is a part of the skewed join
  * implementation. It has an embedded physical plan that
@@ -50,12 +47,11 @@ import com.google.common.collect.Maps;
 public class POPartitionRearrange extends POLocalRearrange {
 
     private static final long serialVersionUID = 1L;
-    private static final BagFactory mBagFactory = BagFactory.getInstance();
 
-    private Integer totalReducers = -1;
+    private transient Integer totalReducers;
     // ReducerMap will store the tuple, max reducer index & min reducer index
-    private Map<Object, Pair<Integer, Integer> > reducerMap = Maps.newHashMap();
-    private boolean inited;
+    private transient Map<Object, Pair<Integer, Integer> > reducerMap;
+    private transient boolean inited;
 
     private PigContext pigContext;
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Fri Mar  4 18:17:39 2016
@@ -23,7 +23,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -32,51 +31,48 @@ public class POPoissonSample extends Phy
 
     private static final long serialVersionUID = 1L;
 
-    private static final TupleFactory tf = TupleFactory.getInstance();
-    private static Result eop = new Result(POStatus.STATUS_EOP, null);
+    // 17 is not a magic number. It can be obtained by using a poisson
+    // cumulative distribution function with the mean set to 10 (empirically,
+    // minimum number of samples) and the confidence set to 95%
+    public static final int DEFAULT_SAMPLE_RATE = 17;
+
+    private int sampleRate = 0;
+
+    private float heapPerc = 0f;
+
+    private Long totalMemory;
+
+    private transient boolean initialized;
 
     // num of rows sampled so far
-    private int numRowsSampled = 0;
+    private transient int numRowsSampled;
 
     // average size of tuple in memory, for tuples sampled
-    private long avgTupleMemSz = 0;
+    private transient long avgTupleMemSz;
 
     // current row number
-    private long rowNum = 0;
+    private transient long rowNum;
 
     // number of tuples to skip after each sample
-    private long skipInterval = -1;
+    private transient long skipInterval;
 
     // bytes in input to skip after every sample.
     // divide this by avgTupleMemSize to get skipInterval
-    private long memToSkipPerSample = 0;
+    private transient long memToSkipPerSample;
 
     // has the special row with row number information been returned
-    private boolean numRowSplTupleReturned = false;
-
-    // 17 is not a magic number. It can be obtained by using a poisson
-    // cumulative distribution function with the mean set to 10 (empirically,
-    // minimum number of samples) and the confidence set to 95%
-    public static final int DEFAULT_SAMPLE_RATE = 17;
-
-    private int sampleRate = 0;
-
-    private float heapPerc = 0f;
+    private transient boolean numRowSplTupleReturned;
 
     // new Sample result
-    private Result newSample = null;
+    private transient Result newSample;
 
-    public POPoissonSample(OperatorKey k, int rp, int sr, float hp) {
+    public POPoissonSample(OperatorKey k, int rp, int sr, float hp, long tm) {
         super(k, rp, null);
-        numRowsSampled = 0;
-        avgTupleMemSz = 0;
-        rowNum = 0;
-        skipInterval = -1;
-        memToSkipPerSample = 0;
-        numRowSplTupleReturned = false;
-        newSample = null;
         sampleRate = sr;
         heapPerc = hp;
+        if (tm != -1) {
+            totalMemory = tm;
+        }
     }
 
     @Override
@@ -92,10 +88,22 @@ public class POPoissonSample extends Phy
 
     @Override
     public Result getNextTuple() throws ExecException {
+        if (!initialized) {
+            numRowsSampled = 0;
+            avgTupleMemSz = 0;
+            rowNum = 0;
+            skipInterval = -1;
+            memToSkipPerSample = 0;
+            if (totalMemory == null) {
+                // Initialize in backend to get memory of task
+                totalMemory = Runtime.getRuntime().maxMemory();
+            }
+            initialized = true;
+        }
         if (numRowSplTupleReturned) {
             // row num special row has been returned after all inputs
             // were read, nothing more to read
-            return eop;
+            return RESULT_EOP;
         }
 
         Result res = null;
@@ -107,11 +115,7 @@ public class POPoissonSample extends Phy
                 if (res.returnStatus == POStatus.STATUS_NULL) {
                     continue;
                 } else if (res.returnStatus == POStatus.STATUS_EOP) {
-                    if (this.parentPlan.endOfAllInput) {
-                        return eop;
-                    } else {
-                        continue;
-                    }
+                    return res;
                 } else if (res.returnStatus == POStatus.STATUS_ERR) {
                     return res;
                 }
@@ -119,7 +123,7 @@ public class POPoissonSample extends Phy
                 if (res.result == null) {
                     continue;
                 }
-                long availRedMem = (long) (Runtime.getRuntime().maxMemory() * heapPerc);
+                long availRedMem = (long) (totalMemory * heapPerc);
                 memToSkipPerSample = availRedMem/sampleRate;
                 updateSkipInterval((Tuple)res.result);
 
@@ -215,7 +219,7 @@ public class POPoissonSample extends Phy
      */
     private Result createNumRowTuple(Tuple sample) throws ExecException {
         int sz = (sample == null) ? 0 : sample.size();
-        Tuple t = tf.newTuple(sz + 2);
+        Tuple t = mTupleFactory.newTuple(sz + 2);
 
         if (sample != null) {
             for (int i=0; i<sample.size(); i++){

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Fri Mar  4 18:17:39 2016
@@ -29,12 +29,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.SingleTupleBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -53,9 +51,6 @@ public class POPreCombinerLocalRearrange
 
     protected static final long serialVersionUID = 1L;
 
-    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
-    protected static BagFactory mBagFactory = BagFactory.getInstance();
-
     private static final Result ERR_RESULT = new Result();
 
     protected List<PhysicalPlan> plans;
@@ -223,4 +218,12 @@ public class POPreCombinerLocalRearrange
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;
     }
+
+    @Override
+    public POPreCombinerLocalRearrange clone() throws CloneNotSupportedException {
+        POPreCombinerLocalRearrange clone = (POPreCombinerLocalRearrange) super.clone();
+        clone.leafOps = new ArrayList<ExpressionOperator>();
+        clone.setPlans(clonePlans(plans));
+        return clone;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORank.java Fri Mar  4 18:17:39 2016
@@ -32,7 +32,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.ExampleTuple;
@@ -55,8 +54,6 @@ public class PORank extends PhysicalOper
     private List<Boolean> mAscCols;
     private List<Byte> ExprOutputTypes;
 
-    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
-
     /**
      * Unique identifier that links POCounter and PORank,
      * through the global counter labeled with it.
@@ -230,4 +227,11 @@ public class PORank extends PhysicalOper
     public String getOperationID() {
         return operationID;
     }
+
+    @Override
+    public PORank clone() throws CloneNotSupportedException {
+        PORank clone = (PORank)super.clone();
+        // rankPlans, mAscCols, ExprOutputTypes are unused. Not cloning them
+        return clone;
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Fri Mar  4 18:17:39 2016
@@ -26,31 +26,28 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class POReservoirSample extends PhysicalOperator {
 
-    private static final TupleFactory tf = TupleFactory.getInstance();
-
     private static final long serialVersionUID = 1L;
 
     // number of samples to be sampled
     protected int numSamples;
 
-    private transient int nextSampleIdx= 0;
+    private transient int nextSampleIdx = 0;
 
-    private int rowProcessed = 0;
+    private transient int rowProcessed = 0;
 
-    private boolean sampleCollectionDone = false;
+    private transient boolean sampleCollectionDone = false;
 
     //array to store the result
     private transient Result[] samples = null;
 
     // last sample result
-    private Result lastSample = null;
+    private transient Result lastSample = null;
 
     public POReservoirSample(OperatorKey k) {
         this(k, -1, null);
@@ -103,12 +100,20 @@ public class POReservoirSample extends P
                 rowProcessed++;
             } else if (res.returnStatus == POStatus.STATUS_NULL) {
                 continue;
+            } else if (res.returnStatus == POStatus.STATUS_EOP) {
+                if (this.parentPlan.endOfAllInput) {
+                    break;
+                } else {
+                    // In case of Split can get EOP in between.
+                    // Return here instead of setting lastSample to EOP in getSample
+                    return res;
+                }
             } else {
                 break;
             }
         }
 
-        if (res.returnStatus != POStatus.STATUS_EOP) {
+        if (res == null || res.returnStatus != POStatus.STATUS_EOP) {
             Random randGen = new Random();
             while (true) {
                 // pick this as sample
@@ -142,7 +147,7 @@ public class POReservoirSample extends P
         if (lastSample.returnStatus==POStatus.STATUS_EOP) {
             return lastSample;
         }
-        
+
         Result currentSample = retrieveSample();
         // If this is the last sample, tag with number of rows
         if (currentSample.returnStatus == POStatus.STATUS_EOP) {
@@ -156,20 +161,18 @@ public class POReservoirSample extends P
     }
 
     private Result retrieveSample() throws ExecException {
-        if(nextSampleIdx < samples.length){
+        if(nextSampleIdx < Math.min(rowProcessed, samples.length)){
             if (illustrator != null) {
                 illustratorMarkup(samples[nextSampleIdx].result, samples[nextSampleIdx].result, 0);
             }
             Result res = samples[nextSampleIdx++];
             if (res == null) { // Input data has lesser rows than numSamples
-                return new Result(POStatus.STATUS_NULL, null);
+                return RESULT_EMPTY;
             }
             return res;
         }
         else{
-            Result res;
-            res = new Result(POStatus.STATUS_EOP, null);
-            return res;
+            return RESULT_EOP;
         }
     }
 
@@ -195,7 +198,7 @@ public class POReservoirSample extends P
      */
     private Result createNumRowTuple(Tuple sample) throws ExecException {
         int sz = (sample == null) ? 0 : sample.size();
-        Tuple t = tf.newTuple(sz + 2);
+        Tuple t = mTupleFactory.newTuple(sz + 2);
 
         if (sample != null) {
             for (int i=0; i<sample.size(); i++){

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Fri Mar  4 18:17:39 2016
@@ -36,12 +36,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.InternalSortedBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -74,11 +72,11 @@ public class POSort extends PhysicalOper
 	private POUserComparisonFunc mSortFunc;
 	private Comparator<Tuple> mComparator;
 
-	private boolean inputsAccumulated = false;
 	private long limit;
 	public boolean isUDFComparatorUsed = false;
-	private DataBag sortedBag;
 
+	private transient boolean inputsAccumulated = false;
+	private transient DataBag sortedBag;
     private transient Iterator<Tuple> it;
     private transient boolean initialized;
     private transient boolean useDefaultBag;
@@ -95,22 +93,22 @@ public class POSort extends PhysicalOper
 		this.sortPlans = sortPlans;
 		this.mAscCols = mAscCols;
         this.limit = -1;
-		this.mSortFunc = mSortFunc;
-		if (mSortFunc == null) {
+        setSortFunc(mSortFunc);
+	}
+
+	private void setSortFunc(POUserComparisonFunc mSortFunc) {
+	    this.mSortFunc = mSortFunc;
+        if (mSortFunc == null) {
             mComparator = new SortComparator();
-			/*sortedBag = BagFactory.getInstance().newSortedBag(
-					new SortComparator());*/
-			ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
+            ExprOutputTypes = new ArrayList<Byte>(sortPlans.size());
 
-			for(PhysicalPlan plan : sortPlans) {
-				ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
-			}
-		} else {
-			/*sortedBag = BagFactory.getInstance().newSortedBag(
-					new UDFSortComparator());*/
+            for(PhysicalPlan plan : sortPlans) {
+                ExprOutputTypes.add(plan.getLeaves().get(0).getResultType());
+            }
+        } else {
             mComparator = new UDFSortComparator();
-			isUDFComparatorUsed = true;
-		}
+            isUDFComparatorUsed = true;
+        }
 	}
 
 	public POSort(OperatorKey k, int rp, List inp) {
@@ -256,10 +254,10 @@ public class POSort extends PhysicalOper
 
 	@Override
 	public Result getNextTuple() throws ExecException {
-		Result res = new Result();
+		Result inp;
 
 		if (!inputsAccumulated) {
-			res = processInput();
+			inp = processInput();
             if (!initialized) {
                 initialized = true;
                 if (PigMapReduce.sJobConfInternal.get() != null) {
@@ -271,26 +269,28 @@ public class POSort extends PhysicalOper
             }
 			// by default, we create InternalSortedBag, unless user configures
             // explicitly to use old bag
-            sortedBag = useDefaultBag ? BagFactory.getInstance().newSortedBag(mComparator)
+            sortedBag = useDefaultBag ? mBagFactory.newSortedBag(mComparator)
                     : new InternalSortedBag(3, mComparator);
 
-            while (res.returnStatus != POStatus.STATUS_EOP) {
-				if (res.returnStatus == POStatus.STATUS_ERR) {
+            while (inp.returnStatus != POStatus.STATUS_EOP) {
+				if (inp.returnStatus == POStatus.STATUS_ERR) {
 					log.error("Error in reading from the inputs");
-					return res;
-                } else if (res.returnStatus == POStatus.STATUS_NULL) {
+					return inp;
+                } else if (inp.returnStatus == POStatus.STATUS_NULL) {
                     // Ignore and read the next tuple.
-                    res = processInput();
+                    inp = processInput();
                     continue;
                 }
-				sortedBag.add((Tuple) res.result);
-				res = processInput();
+				sortedBag.add((Tuple) inp.result);
+				inp = processInput();
             }
 
 			inputsAccumulated = true;
 
 		}
-		if (it == null) {
+
+        Result res = new Result();
+        if (it == null) {
             it = sortedBag.iterator();
         }
         if (it.hasNext()) {
@@ -301,7 +301,7 @@ public class POSort extends PhysicalOper
             res.returnStatus = POStatus.STATUS_EOP;
             reset();
         }
-		return res;
+        return res;
 	}
 
 	@Override
@@ -367,23 +367,19 @@ public class POSort extends PhysicalOper
 
     @Override
     public POSort clone() throws CloneNotSupportedException {
-        List<PhysicalPlan> clonePlans = new
-            ArrayList<PhysicalPlan>(sortPlans.size());
-        for (PhysicalPlan plan : sortPlans) {
-            clonePlans.add(plan.clone());
+        POSort clone = (POSort) super.clone();
+        clone.sortPlans = clonePlans(sortPlans);
+        if (mSortFunc == null) {
+            setSortFunc(null);
+        } else {
+            setSortFunc(mSortFunc.clone());
         }
         List<Boolean> cloneAsc = new ArrayList<Boolean>(mAscCols.size());
         for (Boolean b : mAscCols) {
             cloneAsc.add(b);
         }
-        POUserComparisonFunc cloneFunc = null;
-        if (mSortFunc != null) {
-            cloneFunc = mSortFunc.clone();
-        }
-        // Don't set inputs as PhysicalPlan.clone will take care of that
-        return new POSort(new OperatorKey(mKey.scope,
-            NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
-            requestedParallelism, null, clonePlans, cloneAsc, cloneFunc);
+        clone.mAscCols = cloneAsc;
+        return clone;
     }
 
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Fri Mar  4 18:17:39 2016
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
@@ -32,9 +31,6 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * The MapReduce Split operator.
@@ -49,7 +45,7 @@ import org.apache.pig.pen.util.LineageTr
  * as outputs of this operator using the conditions
  * specified in the LOSplit. So LOSplit will be converted
  * into:
- * 
+ *
  *     |        |           |
  *  Filter1  Filter2 ... Filter3
  *     |        |    ...    |
@@ -63,13 +59,13 @@ import org.apache.pig.pen.util.LineageTr
  * approach if not better in many cases because
  * of the availability of attachinInputs. An optimization
  * that can ensue is if there are multiple loads that
- * load the same file, they can be merged into one and 
- * then the operators that take input from the load 
+ * load the same file, they can be merged into one and
+ * then the operators that take input from the load
  * can be stored. This can be used when
  * the mapPlan executes to read the file only once and
- * attach the resulting tuple as inputs to all the 
+ * attach the resulting tuple as inputs to all the
  * operators that take input from this load.
- * 
+ *
  * In some cases where the conditions are exclusive and
  * some outputs are ignored, this approach can be worse.
  * But this leads to easier management of the Split and
@@ -79,24 +75,22 @@ import org.apache.pig.pen.util.LineageTr
 public class POSplit extends PhysicalOperator {
 
     private static final long serialVersionUID = 1L;
-    
+
     /*
      * The filespec that is used to store and load the output of the split job
      * which is the job containing the split
      */
     private FileSpec splitStore;
-       
+
     /*
      * The list of sub-plans the inner plan is composed of
      */
     private List<PhysicalPlan> myPlans = new ArrayList<PhysicalPlan>();
-    
+
     private BitSet processedSet = new BitSet();
-    
-    private static Result empty = new Result(POStatus.STATUS_NULL, null);
-    
-    private boolean inpEOP = false;
-    
+
+    private transient boolean inpEOP = false;
+
     /**
      * Constructs an operator with the specified key
      * @param k the operator key
@@ -107,7 +101,7 @@ public class POSplit extends PhysicalOpe
 
     /**
      * Constructs an operator with the specified key
-     * and degree of parallelism 
+     * and degree of parallelism
      * @param k the operator key
      * @param rp the degree of parallelism requested
      */
@@ -116,7 +110,7 @@ public class POSplit extends PhysicalOpe
     }
 
     /**
-     * Constructs an operator with the specified key and inputs 
+     * Constructs an operator with the specified key and inputs
      * @param k the operator key
      * @param inp the inputs that this operator will read data from
      */
@@ -128,7 +122,7 @@ public class POSplit extends PhysicalOpe
      * Constructs an operator with the specified key,
      * degree of parallelism and inputs
      * @param k the operator key
-     * @param rp the degree of parallelism requested 
+     * @param rp the degree of parallelism requested
      * @param inp the inputs that this operator will read data from
      */
     public POSplit(OperatorKey k, int rp, List<PhysicalOperator> inp) {
@@ -172,20 +166,20 @@ public class POSplit extends PhysicalOpe
     }
 
     /**
-     * Returns the list of nested plans. 
+     * Returns the list of nested plans.
      * @return the list of the nested plans
      * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter
      */
     public List<PhysicalPlan> getPlans() {
         return myPlans;
     }
-    
+
     /**
-     * Appends the specified plan to the end of 
+     * Appends the specified plan to the end of
      * the nested input plan list
      * @param inPlan plan to be appended to the list
      */
-    public void addPlan(PhysicalPlan inPlan) {        
+    public void addPlan(PhysicalPlan inPlan) {
         myPlans.add(inPlan);
         processedSet.set(myPlans.size()-1);
     }
@@ -199,18 +193,18 @@ public class POSplit extends PhysicalOpe
         myPlans.remove(plan);
         processedSet.clear(myPlans.size());
     }
-   
+
     @Override
     public Result getNextTuple() throws ExecException {
 
         if (this.parentPlan.endOfAllInput) {
-            
-            return getStreamCloseResult();         
-        
-        } 
-        
+
+            return getStreamCloseResult();
+
+        }
+
         if (processedSet.cardinality() == myPlans.size()) {
-            
+
             Result inp = processInput();
 
             if (inp.returnStatus == POStatus.STATUS_EOP && this.parentPlan.endOfAllInput) {
@@ -221,44 +215,44 @@ public class POSplit extends PhysicalOpe
                 || inp.returnStatus == POStatus.STATUS_ERR ) {
                 return inp;
             }
-         
+
             Tuple tuple = (Tuple)inp.result;
             for (PhysicalPlan pl : myPlans) {
                 pl.attachInput(tuple);
             }
-            
+
             processedSet.clear();
         }
-        
-        return processPlan();                                       
+
+        return processPlan();
     }
 
     private Result processPlan() throws ExecException {
-   
+
         int idx = processedSet.nextClearBit(0);
         PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
-        
+
         Result res = runPipeline(leaf);
-        
+
         if (res.returnStatus == POStatus.STATUS_EOP) {
-            processedSet.set(idx++);        
+            processedSet.set(idx++);
             if (idx < myPlans.size()) {
                 res = processPlan();
             }
         }
-        
-        return (res.returnStatus == POStatus.STATUS_OK) ? res : empty;
+
+        return (res.returnStatus == POStatus.STATUS_OK) ? res : RESULT_EMPTY;
     }
-    
+
     private Result runPipeline(PhysicalOperator leaf) throws ExecException {
-       
+
         Result res = null;
-        
+
         while (true) {
-            
+
             res = leaf.getNextTuple();
-            
-            if (res.returnStatus == POStatus.STATUS_OK) {                
+
+            if (res.returnStatus == POStatus.STATUS_OK) {
                 break;
             } else if (res.returnStatus == POStatus.STATUS_NULL) {
                 continue;
@@ -267,19 +261,19 @@ public class POSplit extends PhysicalOpe
             } else if (res.returnStatus == POStatus.STATUS_ERR) {
                 break;
             }
-        }   
-        
+        }
+
         return res;
     }
-    
+
     private Result getStreamCloseResult() throws ExecException {
         Result res = null;
-        
+
         while (true) {
-            
+
             if (processedSet.cardinality() == myPlans.size()) {
                 Result inp = processInput();
-                if (inp.returnStatus == POStatus.STATUS_OK) {                
+                if (inp.returnStatus == POStatus.STATUS_OK) {
                     Tuple tuple = (Tuple)inp.result;
                     for (PhysicalPlan pl : myPlans) {
                         pl.attachInput(tuple);
@@ -293,40 +287,48 @@ public class POSplit extends PhysicalOpe
                     return inp;
                 }
                 processedSet.clear();
-            } 
-            
+            }
+
             int idx = processedSet.nextClearBit(0);
             if (inpEOP ) {
                 myPlans.get(idx).endOfAllInput = true;
             }
             PhysicalOperator leaf = myPlans.get(idx).getLeaves().get(0);
-            
+
             res = leaf.getNextTuple();
-           
+
             if (res.returnStatus == POStatus.STATUS_EOP)  {
-                processedSet.set(idx++);        
+                processedSet.set(idx++);
                 if (idx < myPlans.size()) {
                     continue;
                 }
             } else {
                 break;
             }
-            
-            if (!inpEOP && res.returnStatus == POStatus.STATUS_EOP) {                   
+
+            if (!inpEOP && res.returnStatus == POStatus.STATUS_EOP) {
                 continue;
             } else {
                 break;
             }
         }
-        
+
         return res;
-                
+
     }
-    
+
+    @Override
+    public POSplit clone() throws CloneNotSupportedException {
+        POSplit opClone = (POSplit) super.clone();
+        opClone.processedSet = new BitSet();
+        opClone.myPlans = clonePlans(myPlans);
+        return opClone;
+    }
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
-      // no op  
+      // no op
       return null;
     }
-        
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Fri Mar  4 18:17:39 2016
@@ -50,8 +50,8 @@ import org.apache.pig.tools.pigstats.Pig
 public class POStore extends PhysicalOperator {
 
     private static final long serialVersionUID = 1L;
-    protected static Result empty = new Result(POStatus.STATUS_NULL, null);
     transient private StoreFuncInterface storer;
+    transient private StoreFuncDecorator sDecorator;
     transient private POStoreImpl impl;
     transient private String counterName = null;
     private FileSpec sFile;
@@ -161,10 +161,10 @@ public class POStore extends PhysicalOpe
             switch (res.returnStatus) {
             case POStatus.STATUS_OK:
                 if (illustrator == null) {
-                    storer.putNext((Tuple)res.result);
+                    sDecorator.putNext((Tuple) res.result);
                 } else
                     illustratorMarkup(res.result, res.result, 0);
-                res = empty;
+                res = RESULT_EMPTY;
 
                 if (counterName != null) {
                     ((MapReducePOStoreImpl) impl).incrRecordCounter(counterName, 1);
@@ -250,10 +250,24 @@ public class POStore extends PhysicalOpe
         if(storer == null){
             storer = (StoreFuncInterface)PigContext.instantiateFuncFromSpec(sFile.getFuncSpec());
             storer.setStoreFuncUDFContextSignature(signature);
+            // Init the Decorator we use for writing Tuples
+            setStoreFuncDecorator(new StoreFuncDecorator(storer, signature));
         }
         return storer;
     }
 
+    void setStoreFuncDecorator(StoreFuncDecorator sDecorator) {
+        this.sDecorator = sDecorator;
+    }
+
+    /**
+     *
+     * @return The {@link StoreFuncDecorator} used to write Tuples
+     */
+    public StoreFuncDecorator getStoreFuncDecorator() {
+        return sDecorator;
+    }
+
     /**
      * @param sortInfo the sortInfo to set
      */

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Fri Mar  4 18:17:39 2016
@@ -26,39 +26,38 @@ import java.util.concurrent.ArrayBlockin
 import java.util.concurrent.BlockingQueue;
 
 import org.apache.pig.PigException;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.streaming.ExecutableManager;
-import org.apache.pig.impl.streaming.StreamingCommand;
-import org.apache.pig.pen.util.ExampleTuple;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.pen.util.ExampleTuple;
 
 public class POStream extends PhysicalOperator {
-
     private static final long serialVersionUID = 2L;
-    
-    protected static final Result EOP_RESULT = new Result(POStatus.STATUS_EOP, null);
 
-    protected String executableManagerStr;            // String representing ExecutableManager to use
-    transient private ExecutableManager executableManager;    // ExecutableManager to use 
-    protected StreamingCommand command;               // Actual command to be run
+    private String executableManagerStr;            // String representing ExecutableManager to use
+    private StreamingCommand command;               // Actual command to be run
     private Properties properties;
 
-    private boolean initialized = false;
-
     protected BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
 
     protected BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
 
-    protected boolean allInputFromPredecessorConsumed = false;
+    private transient ExecutableManager executableManager;    // ExecutableManager to use
+
+    private transient boolean initialized = false;
+
+    protected transient boolean allInputFromPredecessorConsumed = false;
 
-    protected boolean allOutputFromBinaryProcessed = false;
+    protected transient boolean allOutputFromBinaryProcessed = false;
 
     /**
      * This flag indicates whether streaming is done through fetching. If set,
@@ -68,7 +67,7 @@ public class POStream extends PhysicalOp
      */
     private boolean isFetchable;
 
-    public POStream(OperatorKey k, ExecutableManager executableManager, 
+    public POStream(OperatorKey k, ExecutableManager executableManager,
                       StreamingCommand command, Properties properties) {
         super(k);
         this.executableManagerStr = executableManager.getClass().getName();
@@ -77,21 +76,21 @@ public class POStream extends PhysicalOp
 
         // Setup streaming-specific properties
         if (command.getShipFiles()) {
-            parseShipCacheSpecs(command.getShipSpecs(), 
+            parseShipCacheSpecs(command.getShipSpecs(),
                                 properties, "pig.streaming.ship.files");
         }
-        parseShipCacheSpecs(command.getCacheSpecs(), 
+        parseShipCacheSpecs(command.getCacheSpecs(),
                             properties, "pig.streaming.cache.files");
     }
-    
-    private static void parseShipCacheSpecs(List<String> specs, 
+
+    private static void parseShipCacheSpecs(List<String> specs,
             Properties properties, String property) {
-        
+
         String existingValue = properties.getProperty(property, "");
         if (specs == null || specs.size() == 0) {
             return;
         }
-        
+
         // Setup streaming-specific properties
         StringBuffer sb = new StringBuffer();
         Iterator<String> i = specs.iterator();
@@ -108,13 +107,13 @@ public class POStream extends PhysicalOp
                 sb.append(", ");
             }
         }
-        properties.setProperty(property, sb.toString());        
+        properties.setProperty(property, sb.toString());
     }
 
     public Properties getShipCacheProperties() {
         return properties;
     }
-    
+
     /**
      * Get the {@link StreamingCommand} for this <code>StreamSpec</code>.
      * @return the {@link StreamingCommand} for this <code>StreamSpec</code>
@@ -122,17 +121,13 @@ public class POStream extends PhysicalOp
     public StreamingCommand getCommand() {
         return command;
     }
-    
-    
-    /* (non-Javadoc)
-     * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator#getNext(org.apache.pig.data.Tuple)
-     */
+
     @Override
     public Result getNextTuple() throws ExecException {
         // The POStream Operator works with ExecutableManager to
         // send input to the streaming binary and to get output
         // from it. To achieve a tuple oriented behavior, two queues
-        // are used - one for output from the binary and one for 
+        // are used - one for output from the binary and one for
         // input to the binary. In each getNext() call:
         // 1) If there is no more output expected from the binary, an EOP is
         // sent to successor
@@ -142,14 +137,14 @@ public class POStream extends PhysicalOp
         // send input to the binary, then the next tuple from the
         // predecessor is got and passed to the binary
         try {
-            // if we are being called AFTER all output from the streaming 
+            // if we are being called AFTER all output from the streaming
             // binary has already been sent to us then just return EOP
             // The "allOutputFromBinaryProcessed" flag is set when we see
             // an EOS (End of Stream output) from streaming binary
             if(allOutputFromBinaryProcessed) {
-                return new Result(POStatus.STATUS_EOP, null);
+                return RESULT_EOP;
             }
-            
+
             // if we are here AFTER all map() calls have been completed
             // AND AFTER we process all possible input to be sent to the
             // streaming binary, then all we want to do is read output from
@@ -160,19 +155,16 @@ public class POStream extends PhysicalOp
                     // If we received EOS, it means all output
                     // from the streaming binary has been sent to us
                     // So we can send an EOP to the successor in
-                    // the pipeline. Also since we are being called
-                    // after all input from predecessor has been processed
-                    // it means we got here from a call from close() in
-                    // map or reduce. So once we send this EOP down, 
-                    // getNext() in POStream should never be called. So
-                    // we don't need to set any flag noting we saw all output
-                    // from binary
-                    r = EOP_RESULT;
-                } else if (r.returnStatus == POStatus.STATUS_OK)
+                    // the pipeline and also note this condition
+                    // for future calls
+                    r = RESULT_EOP;
+                    allOutputFromBinaryProcessed = true;
+                } else if (r.returnStatus == POStatus.STATUS_OK) {
                     illustratorMarkup(r.result, r.result, 0);
+                }
                 return(r);
             }
-            
+
             // if we are here, we haven't consumed all input to be sent
             // to the streaming binary - check if we are being called
             // from close() on the map or reduce
@@ -185,7 +177,7 @@ public class POStream extends PhysicalOp
                     // then "initialized" will be true. If not, just
                     // send EOP down.
                     if(getInitialized()) {
-                        // signal End of ALL input to the Executable Manager's 
+                        // signal End of ALL input to the Executable Manager's
                         // Input handler thread
                         binaryInputQueue.put(r);
                         // note this state for future calls
@@ -196,30 +188,24 @@ public class POStream extends PhysicalOp
                             // If we received EOS, it means all output
                             // from the streaming binary has been sent to us
                             // So we can send an EOP to the successor in
-                            // the pipeline. Also since we are being called
-                            // after all input from predecessor has been processed
-                            // it means we got here from a call from close() in
-                            // map or reduce. So once we send this EOP down, 
-                            // getNext() in POStream should never be called. So
-                            // we don't need to set any flag noting we saw all output
-                            // from binary
-                            r = EOP_RESULT;
+                            // the pipeline and also note this condition
+                            // for future calls
+                            r = RESULT_EOP;
+                            allOutputFromBinaryProcessed = true;
                         }
                     }
-                    
+
                 } else if(r.returnStatus == POStatus.STATUS_EOS) {
                     // If we received EOS, it means all output
                     // from the streaming binary has been sent to us
                     // So we can send an EOP to the successor in
-                    // the pipeline. Also we are being called
-                    // from close() in map or reduce (this is so because
-                    // only then this.parentPlan.endOfAllInput is true).
-                    // So once we send this EOP down, getNext() in POStream
-                    // should never be called. So we don't need to set any 
-                    // flag noting we saw all output from binary
-                    r = EOP_RESULT;
-                } else if (r.returnStatus == POStatus.STATUS_OK)
+                    // the pipeline and also note this condition
+                    // for future calls
+                    r = RESULT_EOP;
+                    allOutputFromBinaryProcessed = true;
+                } else if (r.returnStatus == POStatus.STATUS_OK) {
                   illustratorMarkup(r.result, r.result, 0);
+                }
                 return r;
             } else {
                 // we are not being called from close() - so
@@ -231,20 +217,21 @@ public class POStream extends PhysicalOp
                     // So we can send an EOP to the successor in
                     // the pipeline and also note this condition
                     // for future calls
-                    r = EOP_RESULT;
+                    r = RESULT_EOP;
                     allOutputFromBinaryProcessed  = true;
-                } else if (r.returnStatus == POStatus.STATUS_OK)
+                } else if (r.returnStatus == POStatus.STATUS_OK) {
                     illustratorMarkup(r.result, r.result, 0);
+                }
                 return r;
             }
-            
+
         } catch(Exception e) {
             int errCode = 2083;
             String msg = "Error while trying to get next result in POStream.";
             throw new ExecException(msg, errCode, PigException.BUG, e);
         }
-            
-        
+
+
     }
 
     public synchronized boolean getInitialized() {
@@ -265,13 +252,13 @@ public class POStream extends PhysicalOp
                         Result res = binaryOutputQueue.take();
                         return res;
                     }
-                    
-                    // check if we can write tuples to 
+
+                    // check if we can write tuples to
                     // input of the process
                     if(binaryInputQueue.remainingCapacity() > 0) {
-                        
+
                         Result input = processInput();
-                        if(input.returnStatus == POStatus.STATUS_EOP || 
+                        if(input.returnStatus == POStatus.STATUS_EOP ||
                                 input.returnStatus == POStatus.STATUS_ERR) {
                             return input;
                         } else {
@@ -279,16 +266,16 @@ public class POStream extends PhysicalOp
                             // Only when we see the first tuple which can
                             // be sent as input to the binary we want
                             // to initialize the ExecutableManager and set
-                            // up the streaming binary - this is required in 
+                            // up the streaming binary - this is required in
                             // Unions due to a JOIN where there may never be
                             // any input to send to the binary in one of the map
                             // tasks - so we initialize only if we have to.
                             // initialize the ExecutableManager once
                             if(!initialized) {
                                 // set up the executableManager
-                                executableManager = 
+                                executableManager =
                                     (ExecutableManager)PigContext.instantiateFuncFromSpec(executableManagerStr);
-                                
+
                                 try {
                                     executableManager.configure(this);
                                     executableManager.run();
@@ -296,22 +283,23 @@ public class POStream extends PhysicalOp
                                     int errCode = 2084;
                                     String msg = "Error while running streaming binary.";
                                     throw new ExecException(msg, errCode, PigException.BUG, ioe);
-                                }            
+                                }
                                 initialized = true;
                             }
-                            
+
                             // send this input to the streaming
                             // process
                             binaryInputQueue.put(input);
                         }
-                        
+
                     } else {
-                        
+
                         // wait for either input to be available
                         // or output to be consumed
-                        while(binaryOutputQueue.isEmpty() && !binaryInputQueue.isEmpty())
+                        while(binaryOutputQueue.isEmpty() && !binaryInputQueue.isEmpty()) {
                             wait();
-                        
+                        }
+
                     }
                 }
             }
@@ -321,21 +309,22 @@ public class POStream extends PhysicalOp
             throw new ExecException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
+    @Override
     public String toString() {
         return getAliasString() + "POStream" + "[" + command.toString() + "]"
                 + " - " + mKey.toString();
     }
- 
+
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
         v.visitStream(this);
-        
+
     }
 
     @Override
     public String name() {
-       return toString(); 
+       return toString();
     }
 
     @Override
@@ -349,7 +338,7 @@ public class POStream extends PhysicalOp
     }
 
     /**
-     * 
+     *
      */
     public void finish() throws IOException {
         executableManager.close();
@@ -368,7 +357,7 @@ public class POStream extends PhysicalOp
     public BlockingQueue<Result> getBinaryOutputQueue() {
         return binaryOutputQueue;
     }
-    
+
     @Override
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
       if(illustrator != null) {
@@ -393,11 +382,13 @@ public class POStream extends PhysicalOp
         this.isFetchable = isFetchable;
     }
 
-    public POStream(POStream copy){
-        super(copy);
+    @Override
+    public PhysicalOperator clone() throws CloneNotSupportedException {
+        POStream clone = (POStream)super.clone();
+        clone.binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
+        clone.binaryInputQueue = new ArrayBlockingQueue<Result>(1);
+        //Not cloning StreamingCommand as it is read only
+        return clone;
     }
 
-    public String getExecutableManagerStr() {
-        return executableManagerStr;
-    }
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Fri Mar  4 18:17:39 2016
@@ -353,6 +353,9 @@ public class Packager implements Illustr
     public void setUseSecondaryKey(boolean useSecondaryKey) {
         this.useSecondaryKey = useSecondaryKey;
     }
+    public boolean getUseSecondaryKey() {
+        return useSecondaryKey;
+    }
 
     public void setPackageType(PackageType type) {
         this.pkgType = type;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Fri Mar  4 18:17:39 2016
@@ -72,7 +72,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORollupHIIForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -237,12 +236,6 @@ public class PlanHelper {
         }
 
         @Override
-        public void visitPORollupHIIForEach(PORollupHIIForEach hfe) throws VisitorException {
-            super.visitPORollupHIIForEach(hfe);
-            visit(hfe);
-        }
-
-        @Override
         public void visitUnion(POUnion un) throws VisitorException {
             super.visitUnion(un);
             visit(un);



Mime
View raw message