pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1733627 [13/18] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/ contrib/piggybank/java/src/mai...
Date Fri, 04 Mar 2016 18:17:47 GMT
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatusReporter.java Fri Mar  4 18:17:39 2016
@@ -21,8 +21,6 @@ package org.apache.pig.tools.pigstats;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.StatusReporter;
 import org.apache.hadoop.util.Progressable;
-import org.apache.pig.JVMReuseManager;
-import org.apache.pig.StaticDataCleanup;
 import org.apache.pig.backend.hadoop.executionengine.TaskContext;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -35,11 +33,7 @@ public class PigStatusReporter extends S
 
     private TaskContext<?> context = null;
 
-    static {
-        JVMReuseManager.getInstance().registerForStaticDataCleanup(PigStatusReporter.class);
-    }
-
-    @StaticDataCleanup
+    //@StaticDataCleanup
     public static void staticDataCleanup() {
         reporter = null;
     }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/ScriptState.java Fri Mar  4 18:17:39 2016
@@ -63,6 +63,7 @@ import org.apache.pig.impl.plan.DepthFir
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.JarManager;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
 import org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE;
 import org.apache.pig.newplan.logical.relational.LOCross;
@@ -165,7 +166,8 @@ public abstract class ScriptState {
 
     protected String id;
 
-    protected String script;
+    protected String serializedScript;
+    protected String truncatedScript;
     protected String commandLine;
     protected String fileName;
 
@@ -180,7 +182,8 @@ public abstract class ScriptState {
 
     protected ScriptState(String id) {
         this.id = id;
-        this.script = "";
+        this.serializedScript = "";
+        this.truncatedScript = "";
     }
 
     public static ScriptState get() {
@@ -272,7 +275,7 @@ public abstract class ScriptState {
         }
     }
 
-    public void setScript(File file) {
+    public void setScript(File file) throws IOException {
         BufferedReader reader = null;
         try {
             reader = new BufferedReader(new FileReader(file));
@@ -289,10 +292,18 @@ public abstract class ScriptState {
         }
     }
 
-    public void setScript(String script) {
+    public void setScript(String script) throws IOException {
         if (script == null)
             return;
 
+        //Retain the truncated script
+        setTruncatedScript(script);
+
+        //Serialize and encode the string.
+        this.serializedScript =  ObjectSerializer.serialize(script);
+    }
+
+    private void setTruncatedScript(String script) {
         // restrict the size of the script to be stored in job conf
         int maxScriptSize = 10240;
         if (pigContext != null) {
@@ -301,13 +312,10 @@ public abstract class ScriptState {
                 maxScriptSize = Integer.valueOf(prop);
             }
         }
-        script = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize)
+       
+        this.truncatedScript = (script.length() > maxScriptSize) ? script.substring(0, maxScriptSize)
                                                    : script;
 
-        // XML parser cann't handle certain characters, including
-        // the control character (&#1). Use Base64 encoding to
-        // get around this problem
-        this.script = new String(Base64.encodeBase64(script.getBytes()));
     }
 
     public void setScriptFeatures(LogicalPlan plan) {
@@ -372,11 +380,15 @@ public abstract class ScriptState {
         return (commandLine == null) ? "" : commandLine;
     }
 
-    protected String getScript() {
-        return (script == null) ? "" : script;
+    public String getSerializedScript() {
+        return (serializedScript == null) ? "" : serializedScript;
+    }
+
+    public String getScript() {
+        return (truncatedScript == null) ? "" : truncatedScript;
     }
 
-    protected void setScript(BufferedReader reader) {
+    protected void setScript(BufferedReader reader) throws IOException {
         StringBuilder sb = new StringBuilder();
         try {
             String line = reader.readLine();

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/mapreduce/MRScriptState.java Fri Mar  4 18:17:39 2016
@@ -72,7 +72,7 @@ public class MRScriptState extends Scrip
         conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
         conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
         conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
-        conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
+        conf.set(PIG_PROPERTY.SCRIPT.toString(), getSerializedScript());
         conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
 
         try {

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Fri Mar  4 18:17:39 2016
@@ -33,13 +33,17 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPrinter;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.newplan.PlanVisitor;
 import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.apache.pig.tools.pigstats.PigStats.JobGraphPrinter;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.CounterGroup;
@@ -66,9 +70,20 @@ public class TezDAGStats extends JobStat
     public static final String TASK_COUNTER_GROUP = TaskCounter.class.getName();
     public static final String PIG_COUNTER_GROUP = org.apache.pig.PigCounters.class.getName();
 
+    public static final String SUCCESS_HEADER = String.format("VertexId Parallelism TotalTasks"
+            + " %1$14s %2$20s %3$14s %4$14s %5$16s %6$14s %7$16s"
+            + " Alias\tFeature\tOutputs",
+            "InputRecords", "ReduceInputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten");
+
+    public static final String FAILURE_HEADER = String.format("VertexId  State Parallelism TotalTasks"
+            + " %1$14s %2$20s %3$14s %4$14s %5$16s %6$14s %7$16s"
+            + " Alias\tFeature\tOutputs",
+            "InputRecords", "ReduceInputRecords", "OutputRecords", "FileBytesRead", "FileBytesWritten", "HdfsBytesRead", "HdfsBytesWritten");
+
     private Map<String, TezVertexStats> tezVertexStatsMap;
 
     private String appId;
+    private StringBuilder tezDAGPlan;
 
     private int totalTasks = -1;
     private long fileBytesRead = -1;
@@ -87,37 +102,44 @@ public class TezDAGStats extends JobStat
     private long activeSpillCountObj = 0;
     private long activeSpillCountRecs = 0;
 
-    private HashMap<String, Long> multiStoreCounters
+    private Map<String, Long> multiStoreCounters
             = new HashMap<String, Long>();
 
+    private Map<String, OutputStats> outputsByLocation
+            = new HashMap<String, OutputStats>();
+
     /**
      * This class builds the graph of a Tez DAG vertices.
      */
-    static class JobGraphBuilder extends TezOpPlanVisitor {
+    static class TezDAGStatsBuilder extends TezOpPlanVisitor {
 
+        private TezPlanContainerNode tezPlanNode;
         private JobGraph jobPlan;
         private Map<String, TezVertexStats> tezVertexStatsMap;
         private List<TezVertexStats> vertexStatsToBeRemoved;
         private TezDAGScriptInfo dagScriptInfo;
+        private StringBuilder tezDAGPlan;
 
-        public JobGraphBuilder(TezOperPlan plan, TezDAGScriptInfo dagScriptInfo) {
-            super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
-            tezVertexStatsMap = new HashMap<String, TezVertexStats>();
-            vertexStatsToBeRemoved = new ArrayList<TezVertexStats>();
-            jobPlan = new JobGraph();
+        public TezDAGStatsBuilder(TezPlanContainerNode tezPlanNode, TezDAGScriptInfo dagScriptInfo) {
+            super(tezPlanNode.getTezOperPlan(), new DependencyOrderWalker<TezOperator, TezOperPlan>(tezPlanNode.getTezOperPlan()));
+            this.tezPlanNode = tezPlanNode;
+            this.tezVertexStatsMap = new HashMap<String, TezVertexStats>();
+            this.vertexStatsToBeRemoved = new ArrayList<TezVertexStats>();
+            this.jobPlan = new JobGraph();
+            this.tezDAGPlan = new StringBuilder();
             this.dagScriptInfo = dagScriptInfo;
         }
 
-        public Map<String, TezVertexStats> getTezVertexStatsMap() {
-            return tezVertexStatsMap;
-        }
-
-        public JobGraph getJobPlan() {
-            return jobPlan;
+        public TezDAGStats build() throws VisitorException {
+            visit();
+            TezDAGStats dagStats = new TezDAGStats(tezPlanNode.getOperatorKey().toString(), jobPlan, tezVertexStatsMap, tezDAGPlan);
+            dagStats.setAlias(dagScriptInfo);
+            return dagStats;
         }
 
         @Override
         public void visitTezOp(TezOperator tezOp) throws VisitorException {
+            TezPrinter.TezVertexGraphPrinter.writePlan(mPlan, tezOp, tezDAGPlan);
             TezVertexStats currStats =
                     new TezVertexStats(tezOp.getOperatorKey().toString(), jobPlan, tezOp.isUseMRMapSettings());
             jobPlan.add(currStats);
@@ -131,7 +153,7 @@ public class TezDAGStats extends JobStat
                 }
             }
 
-         // Remove VertexGroups (union) from JobGraph since they're not
+            // Remove VertexGroups (union) from JobGraph since they're not
             // materialized as real vertices by Tez.
             if (tezOp.isVertexGroup()) {
                 vertexStatsToBeRemoved.add(currStats);
@@ -158,9 +180,10 @@ public class TezDAGStats extends JobStat
 
     }
 
-    protected TezDAGStats(String name, JobGraph plan, Map<String, TezVertexStats> tezVertexStatsMap) {
+    protected TezDAGStats(String name, JobGraph plan, Map<String, TezVertexStats> tezVertexStatsMap, StringBuilder tezDAGPlan) {
         super(name, plan);
         this.tezVertexStatsMap = tezVertexStatsMap;
+        this.tezDAGPlan = tezDAGPlan;
     }
 
     public TezVertexStats getVertexStats(String vertexName) {
@@ -191,10 +214,10 @@ public class TezDAGStats extends JobStat
             totalTasks = (int) dagGrp.findCounter("TOTAL_LAUNCHED_TASKS").getValue();
 
             CounterGroup fsGrp = tezCounters.getGroup(FS_COUNTER_GROUP);
-            fileBytesRead = fsGrp.findCounter("FILE_BYTES_READ").getValue();
-            fileBytesWritten = fsGrp.findCounter("FILE_BYTES_WRITTEN").getValue();
-            hdfsBytesRead = fsGrp.findCounter("HDFS_BYTES_READ").getValue();
-            hdfsBytesWritten = fsGrp.findCounter("HDFS_BYTES_WRITTEN").getValue();
+            fileBytesRead = fsGrp.findCounter(PigStatsUtil.FILE_BYTES_READ).getValue();
+            fileBytesWritten = fsGrp.findCounter(PigStatsUtil.FILE_BYTES_WRITTEN).getValue();
+            hdfsBytesRead = fsGrp.findCounter(PigStatsUtil.HDFS_BYTES_READ).getValue();
+            hdfsBytesWritten = fsGrp.findCounter(PigStatsUtil.HDFS_BYTES_WRITTEN).getValue();
         } else {
             LOG.warn("Failed to get counters for DAG: " + dag.getName());
         }
@@ -217,7 +240,28 @@ public class TezDAGStats extends JobStat
                     inputs.addAll(vertexStats.getInputs());
                 }
                 if(vertexStats.getOutputs() != null  && !vertexStats.getOutputs().isEmpty()) {
-                    outputs.addAll(vertexStats.getOutputs());
+                    for (OutputStats output : vertexStats.getOutputs()) {
+                        if (outputsByLocation.get(output.getLocation()) != null) {
+                            OutputStats existingOut = outputsByLocation.get(output.getLocation());
+                            // In case of multistore, bytesWritten is already calculated
+                            // from size of all the files in the output directory.
+                            if (!output.getPOStore().isMultiStore() && output.getBytes() > -1) {
+                                long bytes = existingOut.getBytes() > -1
+                                        ? (existingOut.getBytes() + output.getBytes())
+                                        : output.getBytes();
+                                existingOut.setBytes(bytes);
+                            }
+                            if (output.getRecords() > -1) {
+                                long records = existingOut.getRecords() > -1
+                                        ? (existingOut.getRecords() + output.getRecords())
+                                        : output.getRecords();
+                                existingOut.setRecords(records);
+                            }
+                        } else {
+                            outputs.add(output);
+                            outputsByLocation.put(output.getLocation(), output);
+                        }
+                    }
                 }
                 /*if (vertexStats.getHdfsBytesRead() >= 0) {
                     hdfsBytesRead = (hdfsBytesRead == -1) ? 0 : hdfsBytesRead;
@@ -275,21 +319,53 @@ public class TezDAGStats extends JobStat
     public String getDisplayString() {
             StringBuilder sb = new StringBuilder();
 
-            sb.append("DAG " + name + ":\n");
-            sb.append(String.format("%1$20s: %2$-100s%n", "ApplicationId",
+            sb.append(String.format("%1$40s: %2$-100s%n", "Name",
+                    name));
+            sb.append(String.format("%1$40s: %2$-100s%n", "ApplicationId",
                     appId));
-            sb.append(String.format("%1$20s: %2$-100s%n", "TotalLaunchedTasks",
+            sb.append(String.format("%1$40s: %2$-100s%n", "TotalLaunchedTasks",
                     totalTasks));
 
-            sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesRead",
+            sb.append(String.format("%1$40s: %2$-100s%n", "FileBytesRead",
                     fileBytesRead));
-            sb.append(String.format("%1$20s: %2$-100s%n", "FileBytesWritten",
+            sb.append(String.format("%1$40s: %2$-100s%n", "FileBytesWritten",
                     fileBytesWritten));
-            sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesRead",
+            sb.append(String.format("%1$40s: %2$-100s%n", "HdfsBytesRead",
                     hdfsBytesRead));
-            sb.append(String.format("%1$20s: %2$-100s%n", "HdfsBytesWritten",
+            sb.append(String.format("%1$40s: %2$-100s%n", "HdfsBytesWritten",
                     hdfsBytesWritten));
 
+            sb.append(String.format("%1$40s: %2$-100s%n", "SpillableMemoryManager spill count",
+                    spillCount));
+            sb.append(String.format("%1$40s: %2$-100s%n", "Bags proactively spilled",
+                    activeSpillCountObj));
+            sb.append(String.format("%1$40s: %2$-100s%n", "Records proactively spilled",
+                    activeSpillCountRecs));
+
+
+            sb.append("\nDAG Plan:\n");
+            sb.append(tezDAGPlan);
+
+            List<JobStats> success = ((JobGraph)getPlan()).getSuccessfulJobs();
+            List<JobStats> failed = ((JobGraph)getPlan()).getFailedJobs();
+
+            if (success != null && !success.isEmpty()) {
+                sb.append("\nVertex Stats:\n");
+                sb.append(SUCCESS_HEADER).append("\n");
+                for (JobStats js : success) {
+                    sb.append(js.getDisplayString());
+                }
+            }
+
+            if (failed != null && !failed.isEmpty()) {
+                sb.append("\nFailed vertices:\n");
+                sb.append(FAILURE_HEADER).append("\n");
+                for (JobStats js : failed) {
+                    sb.append(js.getDisplayString());
+                }
+                sb.append("\n");
+            }
+
             return sb.toString();
     }
 

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezPigScriptStats.java Fri Mar  4 18:17:39 2016
@@ -43,6 +43,8 @@ import org.apache.pig.tools.pigstats.Out
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.client.DAGStatus;
 
 import com.google.common.collect.Maps;
@@ -73,10 +75,8 @@ public class TezPigScriptStats extends P
         public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanNode) throws VisitorException {
             TezScriptState ss = TezScriptState.get();
             TezDAGScriptInfo dagScriptInfo = ss.setDAGScriptInfo(tezPlanNode);
-            TezDAGStats.JobGraphBuilder jobGraphBuilder = new TezDAGStats.JobGraphBuilder(tezPlanNode.getTezOperPlan(), dagScriptInfo);
-            jobGraphBuilder.visit();
-            TezDAGStats currStats = new TezDAGStats(tezPlanNode.getOperatorKey().toString(), jobGraphBuilder.getJobPlan(), jobGraphBuilder.getTezVertexStatsMap());
-            currStats.setAlias(dagScriptInfo);
+            TezDAGStats.TezDAGStatsBuilder builder = new TezDAGStats.TezDAGStatsBuilder(tezPlanNode, dagScriptInfo);
+            TezDAGStats currStats = builder.build();
             jobPlan.add(currStats);
             List<TezPlanContainerNode> preds = getPlan().getPredecessors(tezPlanNode);
             if (preds != null) {
@@ -109,7 +109,11 @@ public class TezPigScriptStats extends P
 
     public void finish() {
         super.stop();
-        display();
+        try {
+            display();
+        } catch (Throwable e) {
+            LOG.warn("Exception while displaying stats:", e);
+        }
     }
 
     private void display() {
@@ -149,7 +153,10 @@ public class TezPigScriptStats extends P
             }
         }
 
+        int count = 0;
         for (TezDAGStats dagStats : tezDAGStatsMap.values()) {
+            sb.append("\n");
+            sb.append("DAG " + count++ + ":\n");
             sb.append(dagStats.getDisplayString());
             sb.append("\n");
         }
@@ -186,11 +193,24 @@ public class TezPigScriptStats extends P
                 tezScriptState.emitjobFinishedNotification(tezDAGStats);
             } else if (dagStatus.getState() == DAGStatus.State.FAILED) {
                 tezDAGStats.setSuccessful(false);
-                tezDAGStats.setErrorMsg(tezJob.getDiagnostics());
+                String diagnostics = tezJob.getDiagnostics();
+                tezDAGStats.setErrorMsg(diagnostics);
+                tezDAGStats.setBackendException(new TezException(diagnostics));
                 tezScriptState.emitJobFailedNotification(tezDAGStats);
             }
             tezScriptState.dagCompletedNotification(tezJob.getName(), tezDAGStats);
         }
+
+        if (!tezDAGStats.isSuccessful()) {
+            String outputCommitOnDAGSuccess = pigContext.getProperties().getProperty(
+                    TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS);
+            if ((outputCommitOnDAGSuccess == null && TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS_DEFAULT)
+                    || "true".equals(outputCommitOnDAGSuccess)) {
+                for (OutputStats stats : tezDAGStats.getOutputs()) {
+                    stats.setSuccessful(false);
+                }
+            }
+        }
     }
 
     public TezDAGStats addTezJobStatsForNative(String dagName, NativeTezOper tezOper, boolean success) {

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Fri Mar  4 18:17:39 2016
@@ -107,7 +107,7 @@ public class TezScriptState extends Scri
         conf.set(PIG_PROPERTY.HADOOP_VERSION.toString(), getHadoopVersion());
         conf.set(PIG_PROPERTY.VERSION.toString(), getPigVersion());
         conf.set(PIG_PROPERTY.SCRIPT_ID.toString(), id);
-        conf.set(PIG_PROPERTY.SCRIPT.toString(), getScript());
+        conf.set(PIG_PROPERTY.SCRIPT.toString(), getSerializedScript());
         conf.set(PIG_PROPERTY.COMMAND_LINE.toString(), getCommandLine());
     }
 

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Fri Mar  4 18:17:39 2016
@@ -22,7 +22,7 @@ import static org.apache.pig.tools.pigst
 import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapred.Counters;
 import org.apache.pig.PigCounters;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -49,6 +50,7 @@ import org.apache.tez.common.counters.Ta
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.client.VertexStatus;
+import org.apache.tez.dag.api.client.VertexStatus.State;
 
 import com.google.common.collect.Maps;
 
@@ -62,25 +64,25 @@ public class TezVertexStats extends JobS
 
     private boolean isMapOpts;
     private int parallelism;
+    private State vertexState;
     // CounterGroup, Counter, Value
     private Map<String, Map<String, Long>> counters = null;
 
     private List<POStore> stores = null;
     private List<FileSpec> loads = null;
 
-    private int numberMaps = 0;
-    private int numberReduces = 0;
-
-    private long mapInputRecords = 0;
-    private long mapOutputRecords = 0;
-    private long reduceInputRecords = 0;
-    private long reduceOutputRecords = 0;
+    private int numTasks = 0;
+    private long numInputRecords = 0;
+    private long numReduceInputRecords = 0;
+    private long numOutputRecords = 0;
+    private long fileBytesRead = 0;
+    private long fileBytesWritten = 0;
     private long spillCount = 0;
     private long activeSpillCountObj = 0;
     private long activeSpillCountRecs = 0;
 
-    private HashMap<String, Long> multiStoreCounters
-            = new HashMap<String, Long>();
+    private Map<String, Long> multiInputCounters = Maps.newHashMap();
+    private Map<String, Long> multiStoreCounters = Maps.newHashMap();
 
     public TezVertexStats(String name, JobGraph plan, boolean isMapOpts) {
         super(name, plan);
@@ -103,13 +105,25 @@ public class TezVertexStats extends JobS
     @Override
     public String getDisplayString() {
         StringBuilder sb = new StringBuilder();
-        sb.append(String.format("%1$20s: %2$-100s%n", "VertexName", name));
-        if (getAlias() != null && !getAlias().isEmpty()) {
-            sb.append(String.format("%1$20s: %2$-100s%n", "Alias", getAlias()));
+        sb.append(String.format("%-10s ", name));
+        if (state == JobState.FAILED) {
+            sb.append(vertexState.name());
         }
-        if (getFeature() != null && !getFeature().isEmpty()) {
-            sb.append(String.format("%1$20s: %2$-100s%n", "Features", getFeature()));
+        sb.append(String.format("%9s ", parallelism));
+        sb.append(String.format("%10s ", numTasks));
+        sb.append(String.format("%14s ", numInputRecords));
+        sb.append(String.format("%20s ", numReduceInputRecords));
+        sb.append(String.format("%14s ", numOutputRecords));
+        sb.append(String.format("%14s ", fileBytesRead));
+        sb.append(String.format("%16s ", fileBytesWritten));
+        sb.append(String.format("%14s ", hdfsBytesRead));
+        sb.append(String.format("%16s ", hdfsBytesWritten));
+        sb.append(getAlias()).append("\t");
+        sb.append(getFeature()).append("\t");
+        for (OutputStats os : outputs) {
+            sb.append(os.getLocation()).append(",");
         }
+        sb.append("\n");
         return sb.toString();
     }
 
@@ -123,7 +137,7 @@ public class TezVertexStats extends JobS
             this.stores = (List<POStore>) ObjectSerializer.deserialize(
                     conf.get(JobControlCompiler.PIG_REDUCE_STORES));
             this.loads = (List<FileSpec>) ObjectSerializer.deserialize(
-                    conf.get("pig.inputs"));
+                    conf.get(PigInputFormat.PIG_INPUTS));
         } catch (IOException e) {
             LOG.warn("Failed to deserialize the store list", e);
         }
@@ -138,17 +152,12 @@ public class TezVertexStats extends JobS
     }
 
     public void accumulateStats(VertexStatus status, int parallelism) {
-        hdfsBytesRead = -1;
-        hdfsBytesWritten = -1;
 
         if (status != null) {
             setSuccessful(status.getState().equals(VertexStatus.State.SUCCEEDED));
-            this.parallelism = parallelism;
-            if (this.isMapOpts) {
-                numberMaps += parallelism;
-            } else {
-                numberReduces += parallelism;
-            }
+            this.vertexState = status.getState();
+            this.parallelism = parallelism; //compile time parallelism
+            this.numTasks = status.getProgress().getTotalTaskCount(); //run time parallelism
             TezCounters tezCounters = status.getVertexCounters();
             counters = Maps.newHashMap();
             Iterator<CounterGroup> grpIt = tezCounters.iterator();
@@ -163,14 +172,22 @@ public class TezVertexStats extends JobS
                 counters.put(grp.getName(), cntMap);
             }
 
-            if (counters.get(FS_COUNTER_GROUP) != null &&
-                    counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
-                hdfsBytesRead = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
-            }
-            if (counters.get(FS_COUNTER_GROUP) != null &&
-                    counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN) != null) {
-                hdfsBytesWritten = counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+            Map<String, Long> fsCounters = counters.get(FS_COUNTER_GROUP);
+            if (fsCounters != null) {
+                if (fsCounters.containsKey(PigStatsUtil.HDFS_BYTES_READ)) {
+                    this.hdfsBytesRead = fsCounters.get(PigStatsUtil.HDFS_BYTES_READ);
+                }
+                if (fsCounters.containsKey(PigStatsUtil.HDFS_BYTES_WRITTEN)) {
+                    this.hdfsBytesWritten = fsCounters.get(PigStatsUtil.HDFS_BYTES_WRITTEN);
+                }
+                if (fsCounters.containsKey(PigStatsUtil.FILE_BYTES_READ)) {
+                    this.fileBytesRead = fsCounters.get(PigStatsUtil.FILE_BYTES_READ);
+                }
+                if (fsCounters.containsKey(PigStatsUtil.FILE_BYTES_WRITTEN)) {
+                    this.fileBytesWritten = fsCounters.get(PigStatsUtil.FILE_BYTES_WRITTEN);
+                }
             }
+
             Map<String, Long> pigCounters = counters.get(PIG_COUNTER_GROUP);
             if (pigCounters != null) {
                 if (pigCounters.containsKey(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT)) {
@@ -198,24 +215,44 @@ public class TezVertexStats extends JobS
     }
 
     public void addInputStatistics() {
+
+        long inputRecords = -1;
+        Map<String, Long> taskCounters = counters.get(TASK_COUNTER_GROUP);
+        if (taskCounters != null) {
+            if (taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
+                inputRecords = taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
+                numInputRecords = inputRecords;
+            }
+            if (taskCounters.get(TaskCounter.REDUCE_INPUT_RECORDS.name()) != null) {
+                numReduceInputRecords = taskCounters.get(TaskCounter.REDUCE_INPUT_RECORDS.name());
+            }
+        }
+
         if (loads == null) {
             return;
         }
 
+        Map<String, Long> mIGroup = counters.get(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
+        if (mIGroup != null) {
+            multiInputCounters.putAll(mIGroup);
+        }
+
+        // There is always only one load in a Tez vertex
         for (FileSpec fs : loads) {
             long records = -1;
             long hdfsBytesRead = -1;
             String filename = fs.getFileName();
             if (counters != null) {
-                Map<String, Long> taskCounter = counters.get(TASK_COUNTER_GROUP);
-                if (taskCounter != null
-                        && taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
-                    records = taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
-                    if (this.isMapOpts) {
-                        mapInputRecords += records;
-                    } else {
-                        reduceInputRecords += records;
-                    }
+                if (mIGroup != null) {
+                    Long n = mIGroup.get(PigStatsUtil.getMultiInputsCounterName(fs.getFileName(), 0));
+                    if (n != null) records = n;
+                }
+                if (records == -1) {
+                    records = inputRecords;
+                }
+                if (isSuccessful() && records == -1) {
+                    // Tez removes 0 value counters for efficiency.
+                    records = 0;
                 }
                 if (counters.get(FS_COUNTER_GROUP) != null &&
                         counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
@@ -230,10 +267,25 @@ public class TezVertexStats extends JobS
     }
 
     public void addOutputStatistics() {
+
+        long outputRecords = -1;
+
+        Map<String, Long> taskCounters = counters.get(TASK_COUNTER_GROUP);
+        if (taskCounters != null
+                && taskCounters.get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
+            outputRecords = taskCounters.get(TaskCounter.OUTPUT_RECORDS.name());
+            numOutputRecords = outputRecords;
+        }
+
         if (stores == null) {
             return;
         }
 
+        Map<String, Long> msGroup = counters.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        if (msGroup != null) {
+            multiStoreCounters.putAll(msGroup);
+        }
+
         for (POStore sto : stores) {
             if (sto.isTmpStore()) {
                 continue;
@@ -242,23 +294,16 @@ public class TezVertexStats extends JobS
             long hdfsBytesWritten = -1;
             String filename = sto.getSFile().getFileName();
             if (counters != null) {
-                if (sto.isMultiStore()) {
-                    Map<String, Long> msGroup = counters.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-                    if (msGroup != null) {
-                        multiStoreCounters.putAll(msGroup);
-                        Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
-                        if (n != null) records = n;
-                    }
-                } else if (counters.get(TASK_COUNTER_GROUP) != null
-                        && counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
-                    records = counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
-                }
-                if (records != -1) {
-                    if (this.isMapOpts) {
-                        mapOutputRecords += records;
-                    } else {
-                        reduceOutputRecords += records;
-                    }
+                if (msGroup != null) {
+                    Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
+                    if (n != null) records = n;
+                }
+                if (records == -1) {
+                    records = outputRecords;
+                }
+                if (isSuccessful() && records == -1) {
+                    // Tez removes 0 value counters for efficiency.
+                    records = 0;
                 }
             }
             /* TODO: Need to check FILE_BYTES_WRITTEN for local mode */
@@ -284,13 +329,13 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public int getNumberMaps() {
-        return numberMaps;
+        return this.isMapOpts ? numTasks : -1;
     }
 
     @Override
     @Deprecated
     public int getNumberReduces() {
-        return numberReduces;
+        return this.isMapOpts ? -1 : numTasks;
     }
 
     @Override
@@ -332,25 +377,25 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public long getMapInputRecords() {
-        return mapInputRecords;
+        return this.isMapOpts ? numInputRecords : -1;
     }
 
     @Override
     @Deprecated
     public long getMapOutputRecords() {
-        return mapOutputRecords;
+        return this.isMapOpts ? numOutputRecords : -1;
     }
 
     @Override
     @Deprecated
     public long getReduceInputRecords() {
-        return reduceInputRecords;
+        return this.isMapOpts ? -1 : numInputRecords;
     }
 
     @Override
     @Deprecated
     public long getReduceOutputRecords() {
-        return reduceOutputRecords;
+        return this.isMapOpts ? -1 : numOutputRecords;
     }
 
     @Override
@@ -377,7 +422,7 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public Map<String, Long> getMultiStoreCounters() {
-        return multiStoreCounters;
+        return Collections.unmodifiableMap(multiStoreCounters);
     }
 
     @Override

Modified: pig/branches/spark/src/pig-default.properties
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/pig-default.properties?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/pig-default.properties (original)
+++ pig/branches/spark/src/pig-default.properties Fri Mar  4 18:17:39 2016
@@ -57,4 +57,8 @@ pig.sql.type=hcat
 pig.output.committer.recovery.support=false
 
 pig.stats.output.size.reader=org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.FileBasedOutputSizeReader
-pig.stats.output.size.reader.unsupported=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer
+pig.stats.output.size.reader.unsupported=org.apache.pig.builtin.mock.Storage,org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage
+
+pig.tez.opt.union.unsupported.storefuncs=org.apache.hcatalog.pig.HCatStorer,org.apache.hive.hcatalog.pig.HCatStorer,org.apache.pig.piggybank.storage.DBStorage,org.apache.pig.piggybank.storage.MultiStorage
+
+pig.sort.readonce.loadfuncs=org.apache.pig.backend.hadoop.hbase.HBaseStorage,org.apache.pig.backend.hadoop.accumulo.AccumuloStorage
\ No newline at end of file

Propchange: pig/branches/spark/src/pig-default.properties
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar  4 18:17:39 2016
@@ -1,2 +1,2 @@
 /hadoop/pig/branches/multiquery/conf/pig.properties:741727-770826
-/pig/trunk/src/pig-default.properties:1621676-1622566
+/pig/trunk/src/pig-default.properties:1621676-1733612

Modified: pig/branches/spark/src/python/streaming/controller.py
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/python/streaming/controller.py?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/python/streaming/controller.py (original)
+++ pig/branches/spark/src/python/streaming/controller.py Fri Mar  4 18:17:39 2016
@@ -125,7 +125,12 @@ class PythonStreamingController:
                 try:
                     func_output = func(*inputs)
                     if should_log:
-                        log_message("Row %s: UDF Output: %s" % (self.input_count, unicode(func_output)))
+                        try:
+                            log_message("Row %s: UDF Output: %s" % (self.input_count, unicode(func_output)))
+                        except:
+                            #This is probably an error with unicoding the output.  Calling unicode on bytearray will
+                            #throw an exception.  Since its just a log statement, just skip and carry on.
+                            logging.exception("Couldn't log output.  Try to continue.")
                 except:
                     #These errors should always be caused by user code.
                     write_user_exception(module_name, self.stream_error, NUM_LINES_OFFSET_TRACE)

Modified: pig/branches/spark/test/e2e/pig/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/build.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/build.xml (original)
+++ pig/branches/spark/test/e2e/pig/build.xml Fri Mar  4 18:17:39 2016
@@ -31,12 +31,16 @@
     <equals arg1="${hadoopversion}" arg2="23" />
   </condition>
 
+  <property name="mvnrepo" value="http://repo2.maven.org/maven2"/>
+
   <!-- Separate property name for udfs' build.xml -->
   <property name="udf.dir" value="${basedir}/udfs"/>
   <property name="udf.java.dir" value="${udf.dir}/java"/>
   <property name="udf.jar" value="${udf.java.dir}/testudf.jar"/>
   <property name="python.udf.dir" value="${udf.dir}/python"/>
+  <property name="js.udf.dir" value="${udf.dir}/js" />
   <property name="ruby.udf.dir" value="${udf.dir}/ruby" />
+  <property name="groovy.udf.dir" value="${udf.dir}/groovy" />
   <property name="cpython.udf.dir" value="${udf.dir}/cpython" />
   <property name="params.dir" value="${basedir}/paramfiles"/>
   <property name="e2e.lib.dir" value="${basedir}/lib"/>
@@ -128,12 +132,34 @@
     </not>
   </condition>
 
+  <pathconvert property="tests.suites.all" pathsep=" ">
+      <path path="${test.location}/tests/cmdline.conf"/>
+      <path path="${test.location}/tests/multiquery.conf"/>
+      <path path="${test.location}/tests/negative.conf"/>
+      <path path="${test.location}/tests/nightly.conf"/>
+      <path path="${test.location}/tests/streaming.conf"/>
+      <path path="${test.location}/tests/streaming_local.conf"/>
+      <path path="${test.location}/tests/turing_jython.conf"/>
+      <path path="${test.location}/tests/bigdata.conf"/>
+      <path path="${test.location}/tests/grunt.conf"/>
+      <path path="${test.location}/tests/macro.conf"/>
+      <path path="${test.location}/tests/orc.conf"/>
+      <path path="${test.location}/tests/hcat.conf"/>
+      <path path="${test.location}/tests/utf8.conf"/>
+  </pathconvert>
+
+  <condition property="tests.suites" value="${tests.suites.all}">
+    <not>
+      <isset property="tests.suites"/>
+    </not>
+  </condition>
+
   <target name="udfs">
     <ant dir="${udf.java.dir}"/>
   </target>
 
   <!-- Build an archive to use in the tests -->
-  <target name="tar" description="Create tar file with pig modules">
+  <target name="tar" description="Create tar file with pig modules" depends="download-datafu">
     <mkdir dir="${tar.dir}"/>
     <mkdir dir="${tar.dir}/tests"/>
     <mkdir dir="${tar.dir}/drivers"/>
@@ -144,7 +170,9 @@
     <mkdir dir="${tar.dir}/libexec/PigTest/test"/>
     <mkdir dir="${tar.dir}/libexec/PigTest/generate"/>
     <mkdir dir="${tar.dir}/libexec/python"/>
+    <mkdir dir="${tar.dir}/libexec/js"/>
     <mkdir dir="${tar.dir}/libexec/ruby"/>
+    <mkdir dir="${tar.dir}/libexec/groovy"/>
     <mkdir dir="${tar.dir}/libexec/cpython"/>
     <mkdir dir="${tar.dir}/lib"/>
     <mkdir dir="${tar.dir}/lib/java"/>
@@ -193,10 +221,18 @@
       <fileset dir="${python.udf.dir}"/>
     </copy>
 
+    <copy todir="${tar.dir}/libexec/js">
+      <fileset dir="${js.udf.dir}"/>
+    </copy>
+
     <copy todir="${tar.dir}/libexec/ruby">
       <fileset dir="${ruby.udf.dir}"/>
     </copy>
 
+    <copy todir="${tar.dir}/libexec/groovy">
+      <fileset dir="${groovy.udf.dir}"/>
+    </copy>
+
     <copy todir="${tar.dir}/libexec/cpython">
       <fileset dir="${cpython.udf.dir}"/>
     </copy>
@@ -290,22 +326,10 @@
       <env key="FORK_FACTOR_FILE" value="${fork.factor.conf.file}"/>
       <env key="HADOOP_MAPRED_LOCAL_DIR" value="${hadoop.mapred.local.dir}"/>
       <env key="E2E_DEBUG" value="${e2e.debug}"/>
+      <env key="SORT_BENCHMARKS" value="${sort.benchmarks}"/>
 
       <arg value="./test_harness.pl"/>
-      <arg line="${tests.to.run}"/>
-      <arg value="${test.location}/tests/cmdline.conf"/>
-      <arg value="${test.location}/tests/multiquery.conf"/>
-      <arg value="${test.location}/tests/negative.conf"/>
-      <arg value="${test.location}/tests/nightly.conf"/>
-      <arg value="${test.location}/tests/streaming.conf"/>
-      <arg value="${test.location}/tests/streaming_local.conf"/>
-      <arg value="${test.location}/tests/turing_jython.conf"/>
-      <arg value="${test.location}/tests/bigdata.conf"/>
-      <arg value="${test.location}/tests/grunt.conf"/>
-      <arg value="${test.location}/tests/macro.conf"/>
-      <arg value="${test.location}/tests/orc.conf"/>
-      <arg value="${test.location}/tests/hcat.conf"/>
-      <arg value="${test.location}/tests/utf8.conf"/>
+      <arg line="${tests.to.run} ${tests.suites}"/>
     </exec>
   </target>
 
@@ -404,6 +428,12 @@
     <ant dir="${udf.java.dir}" target="clean"/>
   </target>
 
+  <target name="download-datafu" description="To download datafu" unless="offline">
+    <mkdir dir="lib/java"/>
+    <get src="${mvnrepo}/com/linkedin/datafu/datafu/1.2.0/datafu-1.2.0.jar"
+        dest="lib/java/datafu.jar"/>
+  </target>
+
 </project>
 
 

Modified: pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm (original)
+++ pig/branches/spark/test/e2e/pig/drivers/TestDriverPig.pm Fri Mar  4 18:17:39 2016
@@ -595,8 +595,8 @@ sub postProcessSingleOutputFile
     # Build command to:
     # 1. Combine part files
     my $fppCmd;
-    if(Util::isWindows()) {
-        my $delCmd = "del \"$localdir\\*.crc\"";
+    if(Util::isWindows()||Util::isCygwin()) {
+        my $delCmd = "del \"$localdir\\*.crc\" 2>NUL";
         print $log "$delCmd\n";
         system($delCmd);
         $fppCmd = "cat $localdir\\map* $localdir\\part* 2>NUL";
@@ -614,6 +614,11 @@ sub postProcessSingleOutputFile
     
     $fppCmd .= " > $localdir/out_original";
     
+    #Need slashes to be consistent for windows
+    if (Util::isWindows() || Util::isCygwin()) {
+        $fppCmd =~ s/\\/\//g;
+    }
+    
     # run command
     print $log "$fppCmd\n";
     system($fppCmd);
@@ -623,6 +628,25 @@ sub postProcessSingleOutputFile
     print $log join(" ", @sortCmd) . "\n";
     IPC::Run::run(\@sortCmd, '>', "$localdir/out_sorted") or die "Sort for benchmark comparison failed on $localdir/out_original";
 
+    # Remove extra \r from $localdir/out_sorted for Windows benchmark
+    if(Util::isWindows()||Util::isCygwin()) {
+        my $tmpfile = "$localdir/out_sorted.tmp";
+        link("$localdir/out_sorted", $tmpfile) or
+            die "Unable to create temporary file $tmpfile, $!\n";
+        unlink("$localdir/out_sorted") or
+            die "Unable to unlink file $localdir/out_sorted, $!\n";
+        open(IFH, "< $tmpfile") or
+            die "Unable to open file $tmpfile, $!\n";
+        open(OFH, "> $localdir/out_sorted") or
+            die "Unable to open file $localdir/out_sorted, $!\n";
+        while(<IFH>) {
+            $_ =~ s/\r$//g;
+            print OFH $_;
+        }
+        close(OFH);
+        close(IFH);
+        unlink($tmpfile);
+    }
     return "$localdir/out_sorted";
 }
 
@@ -936,6 +960,17 @@ sub compareSingleOutput
 {
     my ($self, $testResult, $testOutput, $benchmarkOutput, $log) = @_;
 
+    if ($ENV{'SORT_BENCHMARKS'} eq 'true'){
+        # Sort the benchmark Output.
+        my $benchmarkOutput_new = $benchmarkOutput.'_new';
+        my @sortCmd = ('sort', "$benchmarkOutput");
+        print $log join(" ", @sortCmd) . "\n";
+        IPC::Run::run(\@sortCmd, '>', "$benchmarkOutput_new") or die "Sort for benchmark ouput failed on $benchmarkOutput_new";
+        my @renameCmd = ('mv', "$benchmarkOutput_new" , "$benchmarkOutput");
+        print $log join(" ", @renameCmd) . "\n";
+        IPC::Run::run(\@renameCmd, \undef, $log, $log) or die "Rename command failed";
+    }
+
     # cksum the the two files to see if they are the same
     my ($testChksm, $benchmarkChksm);
     IPC::Run::run((['cat', $testOutput], '|', ['cksum']), \$testChksm,

Modified: pig/branches/spark/test/e2e/pig/tests/hcat.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/hcat.conf?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/hcat.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/hcat.conf Fri Mar  4 18:17:39 2016
@@ -44,7 +44,7 @@ stored as textfile;\,
 			'num' => 2,
 			'java_params' => ['-Dhcat.bin=:HCATBIN:'],
 			'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
-sql drop table if exists pig_hcat_ddl_1;
+SQL drop table if exists pig_hcat_ddl_1;
 sql create table pig_hcat_ddl_1(name string,
 age int,
 gpa double)

Modified: pig/branches/spark/test/e2e/pig/tests/multiquery.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/multiquery.conf?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/multiquery.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/multiquery.conf Fri Mar  4 18:17:39 2016
@@ -52,6 +52,21 @@
 # - _TEST_ Streaming with multiple stores.
 # - _TEST_ Streaming in demux.
 # - _TEST_ Streaming in nested demux.
+# MultiQuery_Union (Also refer Union in nightly.conf)
+# - _TEST_ Multiple levels of union with join
+# - _TEST_ Union with replicate join left table part of split
+# - _TEST_ Union with replicate join right table part of split
+# - _TEST_ Union with skewed join left table part of split
+# - _TEST_ Union with skewed join right table part of split
+# - _TEST_ Union with group by + combiner
+# - _TEST_ Union with group by + secondary key partitioner
+# - _TEST_ Union with order by
+# MultiQuery_Self
+# - _TEST_ Self cross
+# - _TEST_ Self cogroup
+# - _TEST_ Three way join (two self)
+# - _TEST_ Self replicate join
+# - _TEST_ Self skewed join
 
 
 $cfg = {
@@ -554,7 +569,301 @@ $cfg = {
             },
             ] # end of tests
         },
+        
+        {
+        'name' => 'MultiQuery_Union',
+        'tests' => [
+            { 
+            # Union + Groupby + Combiner
+            'num' => 1,
+            'floatpostprocess' => 1,
+            'java_params' => ['-Dpig.exec.mapPartAgg=false'], 
+            'delimiter' => '    ',
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = group c by name;
+e = foreach d generate group, SUM(c.age);
+store e into ':OUTPATH:';\,
+            },
+            { 
+            # Union + Groupby + Combiner + POPartialAgg
+            'num' => 2,
+            'floatpostprocess' => 1,
+            'java_params' => ['-Dpig.exec.mapPartAgg=true'], 
+            'delimiter' => '    ',
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+c = union a1, a2;
+d = group c by name;
+e = foreach d generate group, SUM(c.age);
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Union + Replicate Join left outer + Stream + Group by + Secondary Key Partitioner
+            'num' => 3,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:float);
+a1 = filter a by gpa is null or gpa >= 3.9;
+a2 = filter a by gpa < 2;
+b = union a1, a2;
+c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+d = join b by name left outer, c by name using 'replicated';
+e = stream d through `cat` as (name, age, gpa, name1, age1, registration, contributions);
+f = foreach e generate name, age, gpa, registration, contributions;
+g = group f by name;
+g1 = group f by name; -- Two separate groupbys to ensure secondary key partitioner
+h = foreach g { 
+    inner1 = order f by age, gpa, registration, contributions;
+    inner2 = limit inner1 1;
+    generate inner2, SUM(f.age); };
+i = foreach g1 {
+    inner1 = order f by age asc, gpa desc, registration asc, contributions desc;
+    inner2 = limit inner1 1;
+    generate inner2, SUM(f.age); };
+store h into ':OUTPATH:.1';
+store i into ':OUTPATH:.2';\,
+            },
+            {
+            # Union + Replicate Join inner + Order by
+            'num' => 4,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:float);
+a1 = filter a by gpa is null or gpa >= 3.9;
+a2 = filter a by gpa < 1;
+b = union a1, a2;
+b1 = filter b by age < 30;
+b2 = foreach b generate name, age, FLOOR(gpa) as gpa;
+c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+d = join b2 by name, c by name using 'replicated';
+e = foreach d generate b2::name as name, b2::age as age, gpa, registration, contributions;
+f = order e by name, age DESC;
+store f into ':OUTPATH:';\,
+            'sortArgs' => ['-t', '	', '-k', '1,1', '-k', '2,2nr'],
+            },
+            {
+            # Union + Replicate Join right input
+            'num' => 5,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+a1 = filter a by gpa is null or gpa <= 3.9;
+a2 = filter a by gpa < 2;
+b = union a1, a2;
+c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+d = join c by name, b by name using 'replicated';
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Union + Left outer Join
+            'num' => 6,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+a1 = filter a by gpa is null or gpa >= 3.9;
+a2 = filter a by gpa < 1;
+b = union a1, a2;
+c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+d = join b by name left outer, c by name;
+e = foreach d generate b::name as name, b::age as age, gpa, registration, contributions;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Multiple levels of union + Skewed join Right outer
+            'num' => 7,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+b = filter a by gpa >= 3.9;
+b1 = foreach b generate *;
+b2 = foreach b generate *;
+b3 = union onschema b1, b2;
+c = filter a by gpa < 2;
+c1 = foreach c generate *;
+c2 = foreach c generate *;
+c3 = union onschema c1, c2;
+a1 = union onschema b3, c3;
+store a1 into ':OUTPATH:.1';
+d = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+e = join a1 by name right outer, d by name using 'skewed' PARALLEL 3;
+store e into ':OUTPATH:.2';\,
+            },
+            {
+            # Union + Skewed Join right input
+            'num' => 8,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+a1 = filter a by gpa >= 3.9;
+a2 = filter a by gpa < 2;
+b = union a1, a2;
+c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+d = join c by name, b by name using 'skewed' PARALLEL 3;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Union + CROSS
+            'num' => 9,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+a1 = filter a by gpa == 0.00;
+a2 = filter a by gpa == 4.00;
+b = union a1, a2;
+c = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+d = CROSS b, c;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Union + Rank
+            'num' => 10,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+a1 = filter a by gpa is null or gpa >= 3.9;
+a2 = filter a by gpa < 1;
+b = union a1, a2;
+c = rank b;  
+-- Ordering is not guaranteed with union and ranking will differ. So just test rank and column separately
+d = foreach c generate $0;
+e = foreach c generate $1, $2, $3;
+store d into ':OUTPATH:.1';
+store e into ':OUTPATH:.2';\,
+            },
+            {
+            # Union + Rank dense
+            'num' => 11,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+a1 = filter a by gpa is null or gpa >= 3.9;
+a2 = filter a by gpa < 1;
+b = union a1, a2;
+c = rank b by name ASC, age DESC DENSE;  
+store c into ':OUTPATH:';\,
+            },
+            ] # end of tests
+        },
+        
+        {
+        'name' => 'MultiQuery_Self',
+        'tests' => [
+            # Self cross
+            {
+            'num' => 1,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3.9;
+c = filter a by gpa <= 0.5;
+d = filter a by gpa >= 3.5 and gpa < 3.9;
+e = filter a by gpa > 0.5 and gpa < 1;
+f = CROSS b, c PARALLEL 3;
+g = CROSS d, e PARALLEL 4;
+store f into ':OUTPATH:.1';
+store g into ':OUTPATH:.2';\,
+            },
+            {
+            # Self cogroup
+            'num' => 2,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = cogroup c by name, b by name;
+e = foreach d generate flatten(c), flatten(b);
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Three way join (two self)
+            'num' => 3,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = load ':INPATH:/singlefile/voternulltab10k' as (name, age, registration, contributions);
+e = join b by name, c by name, d by name PARALLEL 2;
+store e into ':OUTPATH:';\,
+            },
+            {
+            # Self join replicated
+            'num' => 4,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name, b by name using 'replicated';
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Self join skewed
+            'num' => 5,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name, b by name using 'skewed' PARALLEL 2;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Self join left outer
+            'num' => 6,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name left outer, b by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Self join right outer
+            'num' => 7,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name right outer, b by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Self join full outer
+            'num' => 8,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age, gpa);
+b = filter a by gpa >= 3;
+c = filter a by gpa < 2;
+d = join c by name full outer, b by name PARALLEL 2;
+store d into ':OUTPATH:';\,
+            },
+            {
+            # Self join union replicated
+            'num' => 9,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+a1 = filter a by gpa == 0.00;
+a2 = filter a by gpa == 4.00;
+b = union a1, a2;
+c = JOIN a by name, b by name using 'replicated';
+store c into ':OUTPATH:';\,
+            },
+            {
+            # Self join union
+            'num' => 10,
+            'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age, gpa:float);
+a1 = filter a by gpa == 0.00;
+a2 = filter a by gpa == 4.00;
+b = union a1, a2;
+c = JOIN a by name left, b by name;
+store c into ':OUTPATH:';\,
+            },
+            {
+            # Complex self join
+            'num' => 11,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:float);
+SPLIT a INTO b IF age > 40,
+             c IF age <= 40;
+
+d = FOREACH c GENERATE name, age, gpa;
+
+e = FILTER d BY gpa > 3;
+f = FILTER d BY gpa <= 3;
+
+g = JOIN e BY name LEFT, f BY name;
+h = FOREACH g GENERATE e::name as name, e::age as age, e::gpa as gpa;
+
+i = DISTINCT h;
+
+j = FILTER b BY gpa > 3;
+k = FILTER b by gpa <= 3;
+
+l = JOIN j BY name LEFT, k BY name;
+m = FOREACH l generate j::name as name, j::age as age, j::gpa as gpa;
+n = DISTINCT m;
+
+m = UNION e, i, j, n;
+
+n = JOIN a BY name, m BY name;
+store n into ':OUTPATH:';\,
+            }
+            ] # end of tests
+        },
 
     ] # end of groups
 }
-;
\ No newline at end of file
+;



Mime
View raw message