pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject svn commit: r1795482 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark: SparkPigRecordReader.java SparkPigSplit.java
Date Thu, 18 May 2017 06:31:43 GMT
Author: zly
Date: Thu May 18 06:31:43 2017
New Revision: 1795482

URL: http://svn.apache.org/viewvc?rev=1795482&view=rev
Log:
PIG-5135:HDFS bytes read stats are always 0 in Spark mode-2(Adam via Liyun)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java?rev=1795482&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
(added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
Thu May 18 06:31:43 2017
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+
+/**
+ * Record reader for Spark mode - handles SparkPigSplit
+ */
+public class SparkPigRecordReader extends PigRecordReader {
+
+
+    /**
+     * @param inputformat
+     * @param pigSplit
+     * @param loadFunc
+     * @param context
+     * @param limit
+     */
+    public SparkPigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit, LoadFunc
loadFunc, TaskAttemptContext context, long limit) throws IOException, InterruptedException
{
+        super(inputformat, pigSplit, loadFunc, context, limit);
+    }
+
+    @Override
+    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
+        SparkPigSplit sparkPigSplit = (SparkPigSplit)split;
+        super.initialize(sparkPigSplit.getWrappedPigSplit(), context);
+    }
+}

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java?rev=1795482&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
(added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigSplit.java
Thu May 18 06:31:43 2017
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.spark;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.SplitLocationInfo;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+
+/**
+ * Wrapper class for PigSplits in Spark mode
+ *
+ * Spark only counts HDFS bytes read if we provide a FileSplit, so we have to wrap PigSplits
and have the wrapper
+ * class extend FileSplit
+ */
+public interface SparkPigSplit extends Writable, Configurable, Serializable {
+
+   InputSplit getWrappedSplit();
+
+   InputSplit getWrappedSplit(int idx);
+
+   SplitLocationInfo[] getLocationInfo() throws IOException;
+
+   long getLength(int idx) throws IOException, InterruptedException;
+
+   int getSplitIndex();
+
+   void setMultiInputs(boolean b);
+
+   boolean isMultiInputs();
+
+   int getNumPaths();
+
+   void setDisableCounter(boolean disableCounter);
+
+   boolean disableCounter();
+
+   void setCurrentIdx(int idx);
+
+   PigSplit getWrappedPigSplit();
+
+   public static class FileSparkPigSplit extends FileSplit implements SparkPigSplit {
+
+       private PigSplit pigSplit;
+
+       /**
+        * Spark executor's deserializer calls this, and we have to instantiate a default
wrapped object
+        */
+       public FileSparkPigSplit () {
+           pigSplit = new PigSplit();
+       }
+
+       public FileSparkPigSplit(PigSplit pigSplit) {
+           this.pigSplit = pigSplit;
+       }
+
+       @Override
+       public SplitLocationInfo[] getLocationInfo() throws IOException {
+           return pigSplit.getLocationInfo();
+       }
+
+       @Override
+       public String toString() {
+           return pigSplit.toString();
+       }
+
+       @Override
+       public long getLength() {
+           try {
+               return pigSplit.getLength();
+           } catch (IOException | InterruptedException e) {
+               throw new RuntimeException(e);
+           }
+       }
+
+       @Override
+       public String[] getLocations() throws IOException {
+           try {
+               return pigSplit.getLocations();
+           } catch (InterruptedException e) {
+               throw new RuntimeException(e);
+           }
+       }
+
+       @Override
+       public InputSplit getWrappedSplit() {
+           return pigSplit.getWrappedSplit();
+       }
+
+       @Override
+       public InputSplit getWrappedSplit(int idx) {
+           return pigSplit.getWrappedSplit(idx);
+       }
+
+       @Override
+       public long getLength(int idx) throws IOException, InterruptedException {
+           return pigSplit.getLength(idx);
+       }
+
+       @Override
+       public void readFields(DataInput is) throws IOException {
+           pigSplit.readFields(is);
+       }
+
+       @Override
+       public void write(DataOutput os) throws IOException {
+           pigSplit.write(os);
+       }
+
+       @Override
+       public int getSplitIndex() {
+           return pigSplit.getSplitIndex();
+       }
+
+       @Override
+       public void setMultiInputs(boolean b) {
+           pigSplit.setMultiInputs(b);
+       }
+
+       @Override
+       public boolean isMultiInputs() {
+           return pigSplit.isMultiInputs();
+       }
+
+       @Override
+       public Configuration getConf() {
+           return pigSplit.getConf();
+       }
+
+       @Override
+       public void setConf(Configuration conf) {
+           pigSplit.setConf(conf);
+       }
+
+       @Override
+       public int getNumPaths() {
+           return pigSplit.getNumPaths();
+       }
+
+       @Override
+       public void setDisableCounter(boolean disableCounter) {
+           pigSplit.setDisableCounter(disableCounter);
+       }
+
+       @Override
+       public boolean disableCounter() {
+           return pigSplit.disableCounter();
+       }
+
+       @Override
+       public void setCurrentIdx(int idx) {
+           pigSplit.setCurrentIdx(idx);
+       }
+
+       @Override
+       public PigSplit getWrappedPigSplit() {
+           return this.pigSplit;
+       }
+
+       @Override
+       public Path getPath() {
+           return ((FileSplit)getWrappedPigSplit().getWrappedSplit()).getPath();
+       }
+   }
+
+    public static class GenericSparkPigSplit extends InputSplit implements SparkPigSplit
{
+
+        private static final long serialVersionUID = 1L;
+
+        private PigSplit pigSplit;
+
+        /**
+         * Spark executor's deserializer calls this, and we have to instantiate a default
wrapped object
+         */
+        public GenericSparkPigSplit() {
+            pigSplit = new PigSplit();
+        }
+
+        public GenericSparkPigSplit(PigSplit pigSplit) {
+            this.pigSplit = pigSplit;
+        }
+
+        @Override
+        public SplitLocationInfo[] getLocationInfo() throws IOException {
+            return pigSplit.getLocationInfo();
+        }
+
+        @Override
+        public String toString() {
+            return pigSplit.toString();
+        }
+
+        @Override
+        public long getLength() throws IOException, InterruptedException {
+            return pigSplit.getLength();
+        }
+
+        @Override
+        public String[] getLocations() throws IOException, InterruptedException {
+            return pigSplit.getLocations();
+        }
+
+        @Override
+        public InputSplit getWrappedSplit() {
+            return pigSplit.getWrappedSplit();
+        }
+
+        @Override
+        public InputSplit getWrappedSplit(int idx) {
+            return pigSplit.getWrappedSplit(idx);
+        }
+
+        @Override
+        public long getLength(int idx) throws IOException, InterruptedException {
+            return pigSplit.getLength(idx);
+        }
+
+        @Override
+        public void readFields(DataInput is) throws IOException {
+            pigSplit.readFields(is);
+        }
+
+        @Override
+        public void write(DataOutput os) throws IOException {
+            pigSplit.write(os);
+        }
+
+        @Override
+        public int getSplitIndex() {
+            return pigSplit.getSplitIndex();
+        }
+
+        @Override
+        public void setMultiInputs(boolean b) {
+            pigSplit.setMultiInputs(b);
+        }
+
+        @Override
+        public boolean isMultiInputs() {
+            return pigSplit.isMultiInputs();
+        }
+
+        @Override
+        public Configuration getConf() {
+            return pigSplit.getConf();
+        }
+
+        @Override
+        public void setConf(Configuration conf) {
+            pigSplit.setConf(conf);
+        }
+
+        @Override
+        public int getNumPaths() {
+            return pigSplit.getNumPaths();
+        }
+
+        @Override
+        public void setDisableCounter(boolean disableCounter) {
+            pigSplit.setDisableCounter(disableCounter);
+        }
+
+        @Override
+        public boolean disableCounter() {
+            return pigSplit.disableCounter();
+        }
+
+        @Override
+        public void setCurrentIdx(int idx) {
+            pigSplit.setCurrentIdx(idx);
+        }
+
+        @Override
+        public PigSplit getWrappedPigSplit() {
+            return this.pigSplit;
+        }
+    }
+
+}



Mime
View raw message