pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1711569 - in /pig/branches/spark/src/org/apache/pig: backend/hadoop/executionengine/spark/ backend/hadoop/executionengine/spark/converter/ tools/pigstats/ tools/pigstats/spark/
Date Sat, 31 Oct 2015 02:33:28 GMT
Author: xuefu
Date: Sat Oct 31 02:33:28 2015
New Revision: 1711569

URL: http://svn.apache.org/viewvc?rev=1711569&view=rev
Log:
PIG-4634: Fix records count issues in output statistics (Xianda via Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
Sat Oct 31 02:33:28 2015
@@ -109,7 +109,9 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.JarManager;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
 import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -149,7 +151,7 @@ public class SparkLauncher extends Launc
             explain(sparkplan, System.out, "text", true);
         SparkPigStats sparkStats = (SparkPigStats) pigContext
                 .getExecutionEngine().instantiatePigStats();
-        sparkStats.initialize(sparkplan);
+        sparkStats.initialize(pigContext, sparkplan);
         PigStats.start(sparkStats);
 
         startSparkIfNeeded(pigContext);
@@ -178,6 +180,8 @@ public class SparkLauncher extends Launc
 
         byte[] confBytes = KryoSerializer.serializeJobConf(jobConf);
 
+        SparkPigStatusReporter.getInstance().setCounters(new SparkCounters(sparkContext));
+
         // Create conversion map, mapping between pig operator and spark convertor
         Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap
                 = new HashMap<Class<? extends PhysicalOperator>, RDDConverter>();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
Sat Oct 31 02:33:28 2015
@@ -21,6 +21,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
 import scala.Tuple2;
 
 import org.apache.commons.logging.Log;
@@ -68,9 +72,13 @@ public class StoreConverter implements
             POStore op) throws IOException {
         SparkUtil.assertPredecessorSize(predecessors, op, 1);
         RDD<Tuple> rdd = predecessors.get(0);
+
+        SparkPigStatusReporter.getInstance().createCounter(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP,
+                SparkStatsUtil.getStoreSparkCounterName(op));
+
         // convert back to KV pairs
         JavaRDD<Tuple2<Text, Tuple>> rddPairs = rdd.toJavaRDD().map(
-                new FromTupleFunction());
+                buildFromTupleFunction(op));
 
         PairRDDFunctions<Text, Tuple> pairRDDFunctions = new PairRDDFunctions<Text,
Tuple>(
                 rddPairs.rdd(), SparkUtil.getManifest(Text.class),
@@ -103,6 +111,7 @@ public class StoreConverter implements
         return retRdd;
     }
 
+
     private static POStore configureStorer(JobConf jobConf,
             PhysicalOperator op) throws IOException {
         ArrayList<POStore> storeLocations = Lists.newArrayList();
@@ -125,9 +134,39 @@ public class StoreConverter implements
             Function<Tuple, Tuple2<Text, Tuple>> {
 
         private static Text EMPTY_TEXT = new Text();
+        private String counterGroupName;
+        private String counterName;
+        private SparkCounters sparkCounters;
+
 
         public Tuple2<Text, Tuple> call(Tuple v1) {
+            if (sparkCounters != null) {
+                sparkCounters.increment(counterGroupName, counterName, 1L);
+            }
             return new Tuple2<Text, Tuple>(EMPTY_TEXT, v1);
         }
+
+        public void setCounterGroupName(String counterGroupName) {
+            this.counterGroupName = counterGroupName;
+        }
+
+        public void setCounterName(String counterName) {
+            this.counterName = counterName;
+        }
+
+        public void setSparkCounters(SparkCounters sparkCounter) {
+            this.sparkCounters = sparkCounter;
+        }
+    }
+
+    private FromTupleFunction buildFromTupleFunction(POStore op) {
+        FromTupleFunction ftf = new FromTupleFunction();
+        if (!op.isTmpStore()) {
+            ftf.setCounterGroupName(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP);
+            ftf.setCounterName(SparkStatsUtil.getStoreSparkCounterName(op));
+            SparkPigStatusReporter counterReporter = SparkPigStatusReporter.getInstance();
+            ftf.setSparkCounters(counterReporter.getCounters());
+        }
+        return ftf;
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/PigStatsUtil.java Sat Oct 31 02:33:28
2015
@@ -160,7 +160,7 @@ public class PigStatsUtil {
     private static final String SEPARATOR = "/";
     private static final String SEMICOLON = ";";
 
-    private static String getShortName(String uri) {
+    public static String getShortName(String uri) {
         int scolon = uri.indexOf(SEMICOLON);
         int slash;
         if (scolon!=-1) {

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Sat Oct
31 02:33:28 2015
@@ -21,6 +21,7 @@ package org.apache.pig.tools.pigstats.sp
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 import scala.Option;
 
 import com.google.common.collect.Maps;
@@ -57,17 +58,19 @@ public class SparkJobStats extends JobSt
     public void addOutputInfo(POStore poStore, boolean success,
                               JobMetricsListener jobMetricsListener,
                               Configuration conf) {
-        // TODO: Compute #records
-        long bytes = getOutputSize(poStore, conf);
-        OutputStats outputStats = new OutputStats(poStore.getSFile().getFileName(),
-                bytes, 1, success);
-        outputStats.setPOStore(poStore);
-        outputStats.setConf(conf);
         if (!poStore.isTmpStore()) {
+            long bytes = getOutputSize(poStore, conf);
+            long recordsCount = SparkStatsUtil.getStoreSparkCounterValue(poStore);
+            OutputStats outputStats = new OutputStats(poStore.getSFile().getFileName(),
+                    bytes, recordsCount, success);
+            outputStats.setPOStore(poStore);
+            outputStats.setConf(conf);
+
             outputs.add(outputStats);
         }
     }
 
+
     public void collectStats(JobMetricsListener jobMetricsListener) {
         if (jobMetricsListener != null) {
             Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId);

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Sat Oct
31 02:33:28 2015
@@ -31,6 +31,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -49,8 +50,9 @@ public class SparkPigStats extends PigSt
         this.sparkScriptState = (SparkScriptState) ScriptState.get();
     }
 
-    public void initialize(SparkOperPlan sparkPlan){
+    public void initialize(PigContext pigContext, SparkOperPlan sparkPlan){
         super.start();
+        this.pigContext = pigContext;
         sparkScriptState.setScriptInfo(sparkPlan);
     }
 
@@ -133,17 +135,6 @@ public class SparkPigStats extends PigSt
     }
 
     @Override
-    public Properties getPigProperties() {
-        return null;
-    }
-
-    @Override
-    public String getOutputAlias(String location) {
-        // TODO
-        return null;
-    }
-
-    @Override
     public long getSMMSpillCount() {
         throw new UnsupportedOperationException();
     }
@@ -159,18 +150,6 @@ public class SparkPigStats extends PigSt
     }
 
     @Override
-    public long getBytesWritten() {
-        // TODO
-        return 0;
-    }
-
-    @Override
-    public long getRecordWritten() {
-        // TODO
-        return 0;
-    }
-
-    @Override
     public int getNumberJobs() {
         return jobPlan.size();
     }

Added: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java?rev=1711569&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java
(added)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStatusReporter.java
Sat Oct 31 02:33:28 2015
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.tools.pigstats.spark;
+
+import org.apache.pig.JVMReuseManager;
+import org.apache.pig.StaticDataCleanup;
+
+/**
+ * Just like PigStatusReporter which will create/reset Hadoop counters, SparkPigStatusReporter
will
+ * create/reset Spark counters.
+ * Note that, it is not suitable to make SparkCounters as a Singleton, it will be created/reset
for
+ * a given pig script or a Dump/Store action in Grunt mode.
+ */
+public class SparkPigStatusReporter {
+    private static SparkPigStatusReporter reporter;
+    private SparkCounters counters;
+
+    static {
+        JVMReuseManager.getInstance().registerForStaticDataCleanup(SparkPigStatusReporter.class);
+    }
+
+    @StaticDataCleanup
+    public static void staticDataCleanup() {
+        reporter = null;
+    }
+
+    private SparkPigStatusReporter() {
+    }
+
+    public static SparkPigStatusReporter getInstance() {
+        if (reporter == null) {
+            reporter = new SparkPigStatusReporter();
+        }
+        return reporter;
+    }
+
+    public void createCounter(String groupName, String counterName) {
+        if (counters != null) {
+            counters.createCounter(groupName, counterName);
+        }
+    }
+
+    public SparkCounters getCounters() {
+        return counters;
+    }
+
+    public void setCounters(SparkCounters counters) {
+        this.counters = counters;
+    }
+}

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1711569&r1=1711568&r2=1711569&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Sat Oct
31 02:33:28 2015
@@ -22,12 +22,16 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.spark.JobExecutionStatus;
 import org.apache.spark.SparkJobInfo;
 import org.apache.spark.api.java.JavaSparkContext;
 
 public class SparkStatsUtil {
 
+    public static final String SPARK_STORE_COUNTER_GROUP = "SparkStoreCounters";
+    public static final String SPARK_STORE_RECORD_COUNTER = "Output records in ";
+
   public static void waitForJobAddStats(int jobID,
                                         POStore poStore, SparkOperator sparkOperator,
                                         JobMetricsListener jobMetricsListener,
@@ -58,7 +62,25 @@ public class SparkStatsUtil {
                 sparkContext, jobConf, e);
     }
 
-  public static boolean isJobSuccess(int jobID,
+    public static String getStoreSparkCounterName(POStore store) {
+        String shortName = PigStatsUtil.getShortName(store.getSFile().getFileName());
+
+        StringBuffer sb = new StringBuffer(SPARK_STORE_RECORD_COUNTER);
+        sb.append("_");
+        sb.append(store.getIndex());
+        sb.append("_");
+        sb.append(store.getOperatorKey());
+        sb.append("_");
+        sb.append(shortName);
+        return sb.toString();
+    }
+
+    public static long getStoreSparkCounterValue(POStore store) {
+        SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
+        return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getStoreSparkCounterName(store));
+    }
+
+    public static boolean isJobSuccess(int jobID,
                                     JavaSparkContext sparkContext) {
       JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
       if (status == JobExecutionStatus.SUCCEEDED) {



Mime
View raw message