parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From b...@apache.org
Subject parquet-mr git commit: PARQUET-397: Implement Pig predicate pushdown
Date Fri, 26 Feb 2016 18:28:19 GMT
Repository: parquet-mr
Updated Branches:
  refs/heads/master c44f982e8 -> fb46b941f


PARQUET-397: Implement Pig predicate pushdown

This is based on #296 from @danielcweeks and implements a few remaining review items.

Closes #296.

Author: Daniel Weeks <dweeks@netflix.com>
Author: Ryan Blue <blue@apache.org>

Closes #331 from rdblue/PARQUET-397-pig-predicate-pushdown and squashes the following commits:

c7a9b02 [Ryan Blue] PARQUET-397: Address review comments.
54e23a6 [Ryan Blue] PARQUET-397: Update Pig PPD to throw for bad expressions.
388099b [Daniel Weeks] Cleaning up imports
6b405b4 [Daniel Weeks] Merge remote-tracking branch 'rdblue/pig-predicate-pushdown' into pig-predicate-pushdown
f1ef73e [Daniel Weeks] Fixed binary type and storing filter predicate
a39fdff [Ryan Blue] WIP: Handle a few error cases in Pig predicate pushdown.
2666849 [Daniel Weeks] Fixed test to check the actual number of materialized rows from the
reader
7b019a6 [Daniel Weeks] update tests and logging
f8ca447 [Daniel Weeks] Add predicate pushdown using filter2 api


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/fb46b941
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/fb46b941
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/fb46b941

Branch: refs/heads/master
Commit: fb46b941f7763314d667c437c06b1675e61c3d38
Parents: c44f982
Author: Daniel Weeks <dweeks@netflix.com>
Authored: Fri Feb 26 10:28:07 2016 -0800
Committer: Ryan Blue <blue@apache.org>
Committed: Fri Feb 26 10:28:07 2016 -0800

----------------------------------------------------------------------
 .../org/apache/parquet/pig/ParquetLoader.java   | 186 ++++++++++++++++++-
 .../apache/parquet/pig/TestParquetLoader.java   |  57 ++++--
 pom.xml                                         |   4 +-
 3 files changed, 232 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/fb46b941/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java b/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java
index 0575dce..be54aa8 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/ParquetLoader.java
@@ -30,9 +30,13 @@ import static org.apache.parquet.pig.TupleReadSupport.PARQUET_PIG_REQUIRED_FIELD
 import static org.apache.parquet.pig.TupleReadSupport.PARQUET_COLUMN_INDEX_ACCESS;
 import static org.apache.parquet.pig.TupleReadSupport.getPigSchemaFromMultipleFiles;
 
+import static org.apache.parquet.filter2.predicate.FilterApi.*;
+
 import java.io.IOException;
 import java.lang.ref.Reference;
 import java.lang.ref.SoftReference;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.WeakHashMap;
@@ -42,9 +46,17 @@ import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.filter2.predicate.LogicalInverseRewriter;
+import org.apache.parquet.filter2.predicate.Operators;
+import org.apache.parquet.io.api.Binary;
 import org.apache.pig.Expression;
+import org.apache.pig.Expression.BetweenExpression;
+import org.apache.pig.Expression.InExpression;
+import org.apache.pig.Expression.UnaryExpression;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
+import org.apache.pig.LoadPredicatePushdown;
 import org.apache.pig.LoadPushDown;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
@@ -57,6 +69,11 @@ import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.parser.ParserException;
 
+import static org.apache.pig.Expression.BinaryExpression;
+import static org.apache.pig.Expression.Column;
+import static org.apache.pig.Expression.Const;
+import static org.apache.pig.Expression.OpType;
+
 import org.apache.parquet.Log;
 import org.apache.parquet.hadoop.ParquetInputFormat;
 import org.apache.parquet.hadoop.metadata.GlobalMetaData;
@@ -70,9 +87,12 @@ import org.apache.parquet.io.ParquetDecodingException;
  * @author Julien Le Dem
  *
  */
-public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
+public class ParquetLoader extends LoadFunc implements LoadMetadata, LoadPushDown, LoadPredicatePushdown
{
   private static final Log LOG = Log.getLog(ParquetLoader.class);
 
+  public static final String ENABLE_PREDICATE_FILTER_PUSHDOWN = "parquet.pig.predicate.pushdown.enable";
+  private static final boolean DEFAULT_PREDICATE_PUSHDOWN_ENABLED = false;
+
   // Using a weak hash map will ensure that the cache will be gc'ed when there is memory
pressure
   static final Map<String, Reference<ParquetInputFormat<Tuple>>> inputFormatCache
= new WeakHashMap<String, Reference<ParquetInputFormat<Tuple>>>();
 
@@ -172,6 +192,11 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata,
LoadPushDow
     getConfiguration(job).set(PARQUET_PIG_SCHEMA, pigSchemaToString(schema));
     getConfiguration(job).set(PARQUET_PIG_REQUIRED_FIELDS, serializeRequiredFieldList(requiredFieldList));
     getConfiguration(job).set(PARQUET_COLUMN_INDEX_ACCESS, Boolean.toString(columnIndexAccess));
+
+    FilterPredicate filterPredicate = (FilterPredicate) getFromUDFContext(ParquetInputFormat.FILTER_PREDICATE);
+    if(filterPredicate != null) {
+      ParquetInputFormat.setFilterPredicate(getConfiguration(job), filterPredicate);
+    }
   }
 
   @Override
@@ -392,4 +417,163 @@ public class ParquetLoader extends LoadFunc implements LoadMetadata,
LoadPushDow
     return s;
   }
 
+  @Override
+  public List<String> getPredicateFields(String s, Job job) throws IOException {
+    if(!job.getConfiguration().getBoolean(ENABLE_PREDICATE_FILTER_PUSHDOWN, DEFAULT_PREDICATE_PUSHDOWN_ENABLED))
{
+      return null;
+    }
+
+    List<String> fields = new ArrayList<String>();
+
+    for(FieldSchema field : schema.getFields()) {
+      switch(field.type) {
+        case DataType.BOOLEAN:
+        case DataType.INTEGER:
+        case DataType.LONG:
+        case DataType.FLOAT:
+        case DataType.DOUBLE:
+        case DataType.CHARARRAY:
+          fields.add(field.alias);
+          break;
+        default:
+          // Skip BYTEARRAY, TUPLE, MAP, BAG, DATETIME, BIGINTEGER, BIGDECIMAL
+          break;
+      }
+    }
+
+    return fields;
+  }
+
+  @Override
+  public List<Expression.OpType> getSupportedExpressionTypes() {
+    OpType supportedTypes [] = {
+        OpType.OP_EQ,
+        OpType.OP_NE,
+        OpType.OP_GT,
+        OpType.OP_GE,
+        OpType.OP_LT,
+        OpType.OP_LE,
+        OpType.OP_AND,
+        OpType.OP_OR,
+        //OpType.OP_BETWEEN, // not implemented in Pig yet
+        //OpType.OP_IN,      // not implemented in Pig yet
+        OpType.OP_NOT
+    };
+
+    return Arrays.asList(supportedTypes);
+  }
+
+  @Override
+  public void setPushdownPredicate(Expression e) throws IOException {
+    LOG.info("Pig pushdown expression: " + e);
+
+    FilterPredicate pred = buildFilter(e);
+    LOG.info("Parquet filter predicate expression: " + pred);
+
+    storeInUDFContext(ParquetInputFormat.FILTER_PREDICATE, pred);
+  }
+
+  private FilterPredicate buildFilter(Expression e) {
+    OpType op = e.getOpType();
+
+    if (e instanceof BinaryExpression) {
+      Expression lhs = ((BinaryExpression) e).getLhs();
+      Expression rhs = ((BinaryExpression) e).getRhs();
+
+      switch (op) {
+        case OP_AND:
+          return and(buildFilter(lhs), buildFilter(rhs));
+        case OP_OR:
+          return or(buildFilter(lhs), buildFilter(rhs));
+        case OP_BETWEEN:
+          BetweenExpression between = (BetweenExpression) rhs;
+          return and(
+              buildFilter(OpType.OP_GE, (Column) lhs, (Const) between.getLower()),
+              buildFilter(OpType.OP_LE, (Column) lhs, (Const) between.getUpper()));
+        case OP_IN:
+          FilterPredicate current = null;
+          for (Object value : ((InExpression) rhs).getValues()) {
+            FilterPredicate next = buildFilter(OpType.OP_EQ, (Column) lhs, (Const) value);
+            if (current != null) {
+              current = or(current, next);
+            } else {
+              current = next;
+            }
+          }
+          return current;
+      }
+
+      if (lhs instanceof Column && rhs instanceof Const) {
+        return buildFilter(op, (Column) lhs, (Const) rhs);
+      } else if (lhs instanceof Const && rhs instanceof Column) {
+        return buildFilter(op, (Column) rhs, (Const) lhs);
+      }
+    } else if (e instanceof UnaryExpression && op == OpType.OP_NOT) {
+      return LogicalInverseRewriter.rewrite(
+          not(buildFilter(((UnaryExpression) e).getExpression())));
+    }
+
+    throw new RuntimeException("Could not build filter for expression: " + e);
+  }
+
+  private FilterPredicate buildFilter(OpType op, Column col, Const value) {
+    String name = col.getName();
+    try {
+      FieldSchema f = schema.getField(name);
+      switch (f.type) {
+        case DataType.BOOLEAN:
+          Operators.BooleanColumn boolCol = booleanColumn(name);
+          switch(op) {
+            case OP_EQ: return eq(boolCol, getValue(value, boolCol.getColumnType()));
+            case OP_NE: return notEq(boolCol, getValue(value, boolCol.getColumnType()));
+            default: throw new RuntimeException(
+                "Operation " + op + " not supported for boolean column: " + name);
+          }
+        case DataType.INTEGER:
+          Operators.IntColumn intCol = intColumn(name);
+          return op(op, intCol, value);
+        case DataType.LONG:
+          Operators.LongColumn longCol = longColumn(name);
+          return op(op, longCol, value);
+        case DataType.FLOAT:
+          Operators.FloatColumn floatCol = floatColumn(name);
+          return op(op, floatCol, value);
+        case DataType.DOUBLE:
+          Operators.DoubleColumn doubleCol = doubleColumn(name);
+          return op(op, doubleCol, value);
+        case DataType.CHARARRAY:
+          Operators.BinaryColumn binaryCol = binaryColumn(name);
+          return op(op, binaryCol, value);
+        default:
+          throw new RuntimeException("Unsupported type " + f.type + " for field: " + name);
+      }
+    } catch (FrontendException e) {
+      throw new RuntimeException("Error processing pushdown for column:" + col, e);
+    }
+  }
+
+  private static <C extends Comparable<C>, COL extends Operators.Column<C>
& Operators.SupportsLtGt>
+  FilterPredicate op(Expression.OpType op, COL col, Const valueExpr) {
+    C value = getValue(valueExpr, col.getColumnType());
+    switch (op) {
+      case OP_EQ: return eq(col, value);
+      case OP_NE: return notEq(col, value);
+      case OP_GT: return gt(col, value);
+      case OP_GE: return gtEq(col, value);
+      case OP_LT: return lt(col, value);
+      case OP_LE: return ltEq(col, value);
+    }
+    return null;
+  }
+
+  private static <C extends Comparable<C>> C getValue(Const valueExpr, Class<C>
type) {
+    Object value = valueExpr.getValue();
+
+    if (value instanceof String) {
+      value = Binary.fromString((String) value);
+    }
+
+    return type.cast(value);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/fb46b941/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java
----------------------------------------------------------------------
diff --git a/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java b/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java
index 6f11538..8e57424 100644
--- a/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java
+++ b/parquet-pig/src/test/java/org/apache/parquet/pig/TestParquetLoader.java
@@ -22,17 +22,21 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadPushDown.RequiredField;
 import org.apache.pig.LoadPushDown.RequiredFieldList;
 import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.DataType;
 import static org.apache.pig.data.DataType.*;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.tools.pigstats.JobStats;
 import org.junit.Assert;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
@@ -175,11 +179,11 @@ public class TestParquetLoader {
     for (int i = 0; i < rows; i++) {
       list.add(Storage.tuple(i, "a"+i, i*2));
     }
-    data.set("in", "i:int, a:chararray, b:int", list );
+    data.set("in", "i:int, a:chararray, b:int", list);
     pigServer.setBatchOn();
     pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
     pigServer.deleteFile(out);
-    pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();");
+    pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName()
+ "();");
     pigServer.executeBatch();
       
     //Test Null Padding at the end 
@@ -212,7 +216,7 @@ public class TestParquetLoader {
     for (int i = 0; i < rows; i++) {
       list.add(Storage.tuple(i, (long)i, (float)i, (double)i, Integer.toString(i), Boolean.TRUE));
     }
-    data.set("in", "i:int, l:long, f:float, d:double, s:chararray, b:boolean", list );
+    data.set("in", "i:int, l:long, f:float, d:double, s:chararray, b:boolean", list);
     pigServer.setBatchOn();
     pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
     pigServer.deleteFile(out);
@@ -268,11 +272,11 @@ public class TestParquetLoader {
     pigServer.setBatchOn();
     pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
     pigServer.deleteFile(out);
-    pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();");
+    pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName()
+ "();");
     pigServer.executeBatch();
       
     //Test Null Padding at the end 
-    pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()+"('n1:int,
n2:double, n3:long, n4:chararray', 'true');");
+    pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()
+ "('n1:int, n2:double, n3:long, n4:chararray', 'true');");
     pigServer.registerQuery("STORE B into 'out' using mock.Storage();");
     pigServer.executeBatch();
     
@@ -285,7 +289,7 @@ public class TestParquetLoader {
       assertEquals(4, t.size());
       
       assertEquals(i, t.get(0));
-      assertEquals(i*1.0, t.get(1));
+      assertEquals(i * 1.0, t.get(1));
       assertEquals(i*2L, t.get(2));
       assertEquals("v"+i, t.get(3));
     }
@@ -306,10 +310,10 @@ public class TestParquetLoader {
     pigServer.setBatchOn();
     pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
     pigServer.deleteFile(out);
-    pigServer.registerQuery("Store A into '"+out+"' using " + ParquetStorer.class.getName()+"();");
+    pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName()
+ "();");
     pigServer.executeBatch();
     
-    pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()+"('n1:int,
n2:double, n3:long, n4:chararray', 'true');");
+    pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()
+ "('n1:int, n2:double, n3:long, n4:chararray', 'true');");
     pigServer.registerQuery("C = foreach B generate n1, n3;");
     pigServer.registerQuery("STORE C into 'out' using mock.Storage();");
     pigServer.executeBatch();
@@ -325,10 +329,39 @@ public class TestParquetLoader {
       assertEquals(i, t.get(0));
       assertEquals(i*2L, t.get(1));
     }
-  }  
-  
+  }
+
   @Test
-  public void testRead() {
-    
+  public void testPredicatePushdown() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(ParquetLoader.ENABLE_PREDICATE_FILTER_PUSHDOWN, true);
+
+    PigServer pigServer = new PigServer(ExecType.LOCAL, conf);
+    pigServer.setValidateEachStatement(true);
+
+    String out = "target/out";
+    String out2 = "target/out2";
+    int rows = 10;
+    Data data = Storage.resetData(pigServer);
+    List<Tuple> list = new ArrayList<Tuple>();
+    for (int i = 0; i < rows; i++) {
+      list.add(Storage.tuple(i, i*1.0, i*2L, "v"+i));
+    }
+    data.set("in", "c1:int, c2:double, c3:long, c4:chararray", list);
+    pigServer.setBatchOn();
+    pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+    pigServer.deleteFile(out);
+    pigServer.registerQuery("Store A into '" + out + "' using " + ParquetStorer.class.getName()
+ "();");
+    pigServer.executeBatch();
+
+    pigServer.deleteFile(out2);
+    pigServer.registerQuery("B = LOAD '" + out + "' using " + ParquetLoader.class.getName()
+ "('c1:int, c2:double, c3:long, c4:chararray');");
+    pigServer.registerQuery("C = FILTER B by c1 == 1 or c1 == 5;");
+    pigServer.registerQuery("STORE C into '" + out2 +"' using mock.Storage();");
+    List<ExecJob> jobs = pigServer.executeBatch();
+
+    long recordsRead = jobs.get(0).getStatistics().getInputStats().get(0).getNumberRecords();
+
+    assertEquals(2, recordsRead);
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/fb46b941/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 98fd862..f606faa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,7 +87,7 @@
     <!-- scala.binary.version is used for projects that fetch dependencies that are in
scala -->
     <scala.binary.version>2.10</scala.binary.version>
     <scala.maven.test.skip>false</scala.maven.test.skip>
-    <pig.version>0.11.1</pig.version>
+    <pig.version>0.14.0</pig.version>
     <pig.classifier/>
     <thrift.version>0.7.0</thrift.version>
     <fastutil.version>6.5.7</fastutil.version>
@@ -524,7 +524,7 @@
         <!-- test hadoop-1 with the same jars that were produced for default profile -->
         <maven.main.skip>true</maven.main.skip>
         <hadoop.version>2.3.0</hadoop.version>
-        <pig.version>0.13.0</pig.version>
+        <pig.version>0.14.0</pig.version>
         <pig.classifier>h2</pig.classifier>
       </properties>
     </profile>


Mime
View raw message