Return-Path: Delivered-To: apmail-hadoop-pig-commits-archive@www.apache.org Received: (qmail 27009 invoked from network); 11 Feb 2010 22:13:14 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 11 Feb 2010 22:13:14 -0000 Received: (qmail 16846 invoked by uid 500); 11 Feb 2010 22:13:14 -0000 Delivered-To: apmail-hadoop-pig-commits-archive@hadoop.apache.org Received: (qmail 16795 invoked by uid 500); 11 Feb 2010 22:13:13 -0000 Mailing-List: contact pig-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: pig-dev@hadoop.apache.org Delivered-To: mailing list pig-commits@hadoop.apache.org Received: (qmail 16775 invoked by uid 500); 11 Feb 2010 22:13:13 -0000 Delivered-To: apmail-incubator-pig-commits@incubator.apache.org Received: (qmail 16771 invoked by uid 99); 11 Feb 2010 22:13:13 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Feb 2010 22:13:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Feb 2010 22:13:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id B15DE2388A68; Thu, 11 Feb 2010 22:12:49 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r909165 [5/6] - in /hadoop/pig/trunk: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logica... Date: Thu, 11 Feb 2010 22:12:43 -0000 To: pig-commits@incubator.apache.org From: gates@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100211221249.B15DE2388A68@eris.apache.org> Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java?rev=909165&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java Thu Feb 11 22:12:36 2010 @@ -0,0 +1,503 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test; + +import java.io.IOException; +import java.util.*; +import org.apache.pig.data.DataType; +import org.apache.pig.experimental.logical.expression.*; +import org.apache.pig.experimental.logical.relational.LOFilter; +import org.apache.pig.experimental.logical.relational.LOJoin; +import org.apache.pig.experimental.logical.relational.LOLoad; +import org.apache.pig.experimental.logical.relational.LOStore; +import org.apache.pig.experimental.logical.relational.LogicalPlan; +import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator; +import org.apache.pig.experimental.logical.relational.LogicalSchema; +import org.apache.pig.experimental.logical.rules.MergeFilter; +import org.apache.pig.experimental.logical.rules.PushUpFilter; +import org.apache.pig.experimental.logical.rules.SplitFilter; +import org.apache.pig.experimental.plan.Operator; +import org.apache.pig.experimental.plan.OperatorPlan; +import org.apache.pig.experimental.plan.optimizer.PlanOptimizer; +import org.apache.pig.experimental.plan.optimizer.PlanTransformListener; +import org.apache.pig.experimental.plan.optimizer.Rule; +import org.apache.pig.impl.util.MultiMap; + +import junit.framework.TestCase; + +public class TestExperimentalFilterRule extends TestCase { + + LogicalPlan plan = null; + LogicalRelationalOperator load1 = null; + LogicalRelationalOperator load2 = null; + LogicalRelationalOperator filter = null; + LogicalRelationalOperator join = null; + LogicalRelationalOperator store = null; + + private void prep() { + plan = new LogicalPlan(); + LogicalSchema schema = new LogicalSchema(); + schema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.INTEGER)); + schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY)); + schema.addField(new LogicalSchema.LogicalFieldSchema("age", null, DataType.INTEGER)); + schema.getField(0).uid = 1; + schema.getField(1).uid = 2; + schema.getField(2).uid = 3; + LogicalRelationalOperator l1 = new LOLoad(null, schema, plan); + l1.setAlias("A"); + plan.add(l1); + + schema = new LogicalSchema(); + schema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.INTEGER)); + schema.addField(new LogicalSchema.LogicalFieldSchema("dept", null, DataType.INTEGER)); + schema.addField(new LogicalSchema.LogicalFieldSchema("salary", null, DataType.FLOAT)); + schema.getField(0).uid = 4; + schema.getField(1).uid = 5; + schema.getField(2).uid = 6; + LogicalRelationalOperator l2 = new LOLoad(null, schema, plan); + l2.setAlias("B"); + plan.add(l2); + + MultiMap joinPlans = new MultiMap(); + LogicalExpressionPlan p1 = new LogicalExpressionPlan(); + ProjectExpression lp1 = new ProjectExpression(p1, DataType.CHARARRAY, 0, 1); + p1.add(lp1); + joinPlans.put(0, p1); + + LogicalExpressionPlan p2 = new LogicalExpressionPlan(); + ProjectExpression lp2 = new ProjectExpression(p2, DataType.INTEGER, 1, 1); + p2.add(lp2); + joinPlans.put(1, p2); + + LogicalRelationalOperator j1 = new LOJoin(plan, joinPlans, LOJoin.JOINTYPE.HASH, new boolean[]{true, true}); + j1.setAlias("C"); + plan.add(j1); + + + // build an expression with no AND + LogicalExpressionPlan p3 = new LogicalExpressionPlan(); + LogicalExpression lp3 = new ProjectExpression(p3, DataType.INTEGER, 0, 2); + LogicalExpression cont = new ConstantExpression(p3, DataType.INTEGER, new Integer(3)); + p3.add(lp3); + p3.add(cont); + LogicalExpression eq = new EqualExpression(p3, lp3, cont); + + LogicalRelationalOperator f1 = new LOFilter(plan, p3); + f1.setAlias("D"); + plan.add(f1); + + LogicalRelationalOperator s1 = new LOStore(plan); + plan.add(s1); + + // load --|-join - filter - store + // load --| + plan.connect(l1, j1); + plan.connect(l2, j1); + plan.connect(j1, f1); + plan.connect(f1, s1); + + try { + lp1.setUid(j1); + lp2.setUid(j1); + lp3.setUid(f1); + }catch(Exception e) { + + } + + filter = f1; + store = s1; + join = j1; + load1 = l1; + load2 = l2; + } + + public void testFilterRule() throws Exception { + prep(); + // run split filter rule + Rule r = new SplitFilter("SplitFilter"); + Set s = new HashSet(); + s.add(r); + List> ls = new ArrayList>(); + ls.add(s); + MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3); + optimizer.optimize(); + + assertEquals(plan.getPredecessors(filter).get(0), join); + assertEquals(plan.getSuccessors(filter).get(0), store); + + // run push up filter rule + r = new PushUpFilter("PushUpFilter"); + s = new HashSet(); + s.add(r); + ls = new ArrayList>(); + ls.add(s); + optimizer = new MyPlanOptimizer(plan, ls, 3); + optimizer.optimize(); + + // the filter should be moved up to be after load + assertEquals(plan.getSuccessors(load1).get(0), filter); + assertEquals(plan.getSuccessors(filter).get(0), join); + assertEquals(plan.getSuccessors(join).get(0), store); + + // run merge filter rule + r = new MergeFilter("MergeFilter"); + s = new HashSet(); + s.add(r); + ls = new ArrayList>(); + ls.add(s); + optimizer = new MyPlanOptimizer(plan, ls, 3); + optimizer.optimize(); + + // the filter should the same as before, nothing to merge + assertEquals(plan.getSuccessors(load1).get(0), filter); + assertEquals(plan.getSuccessors(filter).get(0), join); + assertEquals(plan.getSuccessors(join).get(0), store); + } + + // build an expression with 1 AND, it should split into 2 filters + public void testFilterRuleWithAnd() throws Exception { + prep(); + + LogicalExpressionPlan p4 = new LogicalExpressionPlan(); + LogicalExpression lp3 = new ProjectExpression(p4, DataType.INTEGER, 0, 2); + LogicalExpression cont = new ConstantExpression(p4, DataType.INTEGER, new Integer(3)); + p4.add(lp3); + p4.add(cont); + LogicalExpression eq = new EqualExpression(p4, lp3, cont); + + LogicalExpression lp4 = new ProjectExpression(p4, DataType.FLOAT, 0, 5); + LogicalExpression cont2 = new ConstantExpression(p4, DataType.FLOAT, new Float(100)); + p4.add(lp4); + p4.add(cont2); + LogicalExpression eq2 = new EqualExpression(p4, lp4, cont2); + + LogicalExpression and = new AndExpression(p4, eq, eq2); + + lp3.setUid(filter); + lp4.setUid(filter); + + ((LOFilter)filter).setFilterPlan(p4); + + // run split filter rule + Rule r = new SplitFilter("SplitFilter"); + Set s = new HashSet(); + s.add(r); + List> ls = new ArrayList>(); + ls.add(s); + PlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3); + optimizer.optimize(); + + assertEquals(plan.getPredecessors(filter).get(0), join); + Operator next = plan.getSuccessors(filter).get(0); + assertEquals(LOFilter.class, next.getClass()); + next = plan.getSuccessors(next).get(0); + assertEquals(LOStore.class, next.getClass()); + + // run push up filter rule + r = new PushUpFilter("PushUpFilter"); + s = new HashSet(); + s.add(r); + ls = new ArrayList>(); + ls.add(s); + optimizer = new MyPlanOptimizer(plan, ls, 3); + optimizer.optimize(); + + // both filters should be moved up to be after each load + next = plan.getSuccessors(load1).get(0); + assertEquals(next.getClass(), LOFilter.class); + assertEquals(plan.getSuccessors(next).get(0), join); + + next = plan.getSuccessors(load2).get(0); + assertEquals(next.getClass(), LOFilter.class); + assertEquals(plan.getSuccessors(next).get(0), join); + + assertEquals(plan.getSuccessors(join).get(0), store); + + // run merge filter rule + r = new MergeFilter("MergeFilter"); + s = new HashSet(); + s.add(r); + ls = new ArrayList>(); + ls.add(s); + optimizer = new MyPlanOptimizer(plan, ls, 3); + optimizer.optimize(); + + // the filters should the same as before, nothing to merge + next = plan.getSuccessors(load1).get(0); + assertEquals(next.getClass(), LOFilter.class); + assertEquals(plan.getSuccessors(next).get(0), join); + + next = plan.getSuccessors(load2).get(0); + assertEquals(next.getClass(), LOFilter.class); + assertEquals(plan.getSuccessors(next).get(0), join); + + assertEquals(plan.getSuccessors(join).get(0), store); + } + + public void testFilterRuleWith2And() throws Exception { + prep(); + // build an expression with 2 AND, it should split into 3 filters + LogicalExpressionPlan p5 = new LogicalExpressionPlan(); + + + LogicalExpression lp3 = new ProjectExpression(p5, DataType.INTEGER, 0, 2); + LogicalExpression cont = new ConstantExpression(p5, DataType.INTEGER, new Integer(3)); + p5.add(lp3); + p5.add(cont); + LogicalExpression eq = new EqualExpression(p5, lp3, cont); + + LogicalExpression lp4 = new ProjectExpression(p5, DataType.INTEGER, 0, 3); + LogicalExpression cont2 = new ConstantExpression(p5, DataType.INTEGER, new Integer(3)); + p5.add(lp4); + p5.add(cont2); + LogicalExpression eq2 = new EqualExpression(p5, lp4, cont2); + + lp3.setUid(filter); + lp4.setUid(filter); + + LogicalExpression and1 = new AndExpression(p5, eq, eq2); + + + lp3 = new ProjectExpression(p5, DataType.INTEGER, 0, 0); + lp4 = new ProjectExpression(p5, DataType.INTEGER, 0, 3); + lp3.setUid(filter); + lp4.setUid(filter); + p5.add(lp3); + p5.add(lp4); + eq2 = new EqualExpression(p5, lp3, lp4); + + LogicalExpression and2 = new AndExpression(p5, and1, eq2); + + ((LOFilter)filter).setFilterPlan(p5); + + Rule r = new SplitFilter("SplitFilter"); + Set s = new HashSet(); + s.add(r); + List> ls = new ArrayList>(); + ls.add(s); + MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3); + MyPlanTransformListener listener = new MyPlanTransformListener(); + optimizer.addPlanTransformListener(listener); + optimizer.optimize(); + + assertEquals(plan.getPredecessors(filter).get(0), join); + Operator next = plan.getSuccessors(filter).get(0); + assertEquals(next.getClass(), LOFilter.class); + next = plan.getSuccessors(next).get(0); + assertEquals(next.getClass(), LOFilter.class); + next = plan.getSuccessors(next).get(0); + assertEquals(LOStore.class, next.getClass()); + + OperatorPlan transformed = listener.getTransformed(); + assertEquals(transformed.size(), 3); + + // run push up filter rule + r = new PushUpFilter("PushUpFilter"); + s = new HashSet(); + s.add(r); + ls = new ArrayList>(); + ls.add(s); + optimizer = new MyPlanOptimizer(plan, ls, 3); + listener = new MyPlanTransformListener(); + optimizer.addPlanTransformListener(listener); + optimizer.optimize(); + + // 2 filters should be moved up to be after each load, and one filter should remain + next = plan.getSuccessors(load1).get(0); + assertEquals(next.getClass(), LOFilter.class); + assertEquals(plan.getSuccessors(next).get(0), join); + + next = plan.getSuccessors(load2).get(0); + assertEquals(next.getClass(), LOFilter.class); + assertEquals(plan.getSuccessors(next).get(0), join); + + next = plan.getSuccessors(join).get(0); + assertEquals(next.getClass(), LOFilter.class); + + next = plan.getSuccessors(next).get(0); + assertEquals(next.getClass(), LOStore.class); + + transformed = listener.getTransformed(); + assertEquals(transformed.size(), 4); + assertEquals(transformed.getSinks().get(0).getClass(), LOFilter.class); + assertEquals(transformed.getSources().get(0).getClass(), LOLoad.class); + + // run merge filter rule + r = new MergeFilter("MergeFilter"); + s = new HashSet(); + s.add(r); + ls = new ArrayList>(); + ls.add(s); + optimizer = new MyPlanOptimizer(plan, ls, 3); + listener = new MyPlanTransformListener(); + optimizer.addPlanTransformListener(listener); + optimizer.optimize(); + + // the filters should the same as before, nothing to merge + next = plan.getSuccessors(load1).get(0); + assertEquals(next.getClass(), LOFilter.class); + assertEquals(plan.getSuccessors(next).get(0), join); + + next = plan.getSuccessors(load2).get(0); + assertEquals(next.getClass(), LOFilter.class); + assertEquals(plan.getSuccessors(next).get(0), join); + + next = plan.getSuccessors(join).get(0); + assertEquals(next.getClass(), LOFilter.class); + + next = plan.getSuccessors(next).get(0); + assertEquals(next.getClass(), LOStore.class); + + transformed = listener.getTransformed(); + assertNull(transformed); + } + + public void testFilterRuleWith2And2() throws Exception { + prep(); + // build an expression with 2 AND, it should split into 3 filters + LogicalExpressionPlan p5 = new LogicalExpressionPlan(); + + LogicalExpression lp3 = new ProjectExpression(p5, DataType.INTEGER, 0, 2); + lp3.setUid(filter); + LogicalExpression cont = new ConstantExpression(p5, DataType.INTEGER, new Integer(3)); + p5.add(lp3); + p5.add(cont); + LogicalExpression eq = new EqualExpression(p5, lp3, cont); + + lp3 = new ProjectExpression(p5, DataType.INTEGER, 0, 0); + LogicalExpression lp4 = new ProjectExpression(p5, DataType.INTEGER, 0, 3); + p5.add(lp4); + p5.add(lp3); + lp3.setUid(filter); + lp4.setUid(filter); + LogicalExpression eq2 = new EqualExpression(p5, lp3, lp4); + + LogicalExpression and1 = new AndExpression(p5, eq, eq2); + + lp3 = new ProjectExpression(p5, DataType.INTEGER, 0, 2); + lp4 = new ProjectExpression(p5, DataType.FLOAT, 0, 5); + p5.add(lp3); + p5.add(lp4); + lp3.setUid(filter); + lp4.setUid(filter); + eq2 = new EqualExpression(p5, lp3, lp4); + + LogicalExpression and2 = new AndExpression(p5, and1, eq2); + + ((LOFilter)filter).setFilterPlan(p5); + + Rule r = new SplitFilter("SplitFilter"); + Set s = new HashSet(); + s.add(r); + List> ls = new ArrayList>(); + ls.add(s); + MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3); + optimizer.optimize(); + + assertEquals(plan.getPredecessors(filter).get(0), join); + Operator next = plan.getSuccessors(filter).get(0); + assertEquals(next.getClass(), LOFilter.class); + next = plan.getSuccessors(next).get(0); + assertEquals(next.getClass(), LOFilter.class); + next = plan.getSuccessors(next).get(0); + assertEquals(LOStore.class, next.getClass()); + + // run push up filter rule + r = new PushUpFilter("PushUpFilter"); + s = new HashSet(); + s.add(r); + ls = new ArrayList>(); + ls.add(s); + optimizer = new MyPlanOptimizer(plan, ls, 3); + optimizer.optimize(); + + // 1 filter should be moved up to be after a load, and 2 filters should remain + next = plan.getSuccessors(load1).get(0); + assertEquals(next.getClass(), LOFilter.class); + assertEquals(plan.getSuccessors(next).get(0), join); + + next = plan.getSuccessors(load2).get(0); + assertEquals(next, join); + + next = plan.getSuccessors(join).get(0); + assertEquals(next.getClass(), LOFilter.class); + + next = plan.getSuccessors(next).get(0); + assertEquals(next.getClass(), LOFilter.class); + + next = plan.getSuccessors(next).get(0); + assertEquals(next.getClass(), LOStore.class); + + // run merge filter rule + r = new MergeFilter("MergeFilter"); + s = new HashSet(); + s.add(r); + ls = new ArrayList>(); + ls.add(s); + optimizer = new MyPlanOptimizer(plan, ls, 3); + MyPlanTransformListener listener = new MyPlanTransformListener(); + optimizer.addPlanTransformListener(listener); + optimizer.optimize(); + + // the 2 filters after join should merge + next = plan.getSuccessors(load1).get(0); + assertEquals(next.getClass(), LOFilter.class); + assertEquals(plan.getSuccessors(next).get(0), join); + + next = plan.getSuccessors(load2).get(0); + assertEquals(next, join); + + next = plan.getSuccessors(join).get(0); + assertEquals(next.getClass(), LOFilter.class); + + next = plan.getSuccessors(next).get(0); + assertEquals(next.getClass(), LOStore.class); + + OperatorPlan transformed = listener.getTransformed(); + assertEquals(transformed.size(), 2); + } + + public class MyPlanOptimizer extends PlanOptimizer { + + protected MyPlanOptimizer(OperatorPlan p, List> rs, + int iterations) { + super(p, rs, iterations); + } + + public void addPlanTransformListener(PlanTransformListener listener) { + super.addPlanTransformListener(listener); + } + + } + + public class MyPlanTransformListener implements PlanTransformListener { + + private OperatorPlan tp; + + @Override + public void transformed(OperatorPlan fp, OperatorPlan tp) + throws IOException { + this.tp = tp; + } + + public OperatorPlan getTransformed() { + return tp; + } + } +} Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java?rev=909165&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java Thu Feb 11 22:12:36 2010 @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.test; + +import java.io.IOException; +import java.util.List; + +import org.apache.pig.data.DataType; +import org.apache.pig.experimental.logical.expression.AndExpression; +import org.apache.pig.experimental.logical.expression.ConstantExpression; +import org.apache.pig.experimental.logical.expression.EqualExpression; +import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan; +import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor; +import org.apache.pig.experimental.logical.expression.ProjectExpression; +import org.apache.pig.experimental.logical.optimizer.AllExpressionVisitor; +import org.apache.pig.experimental.logical.optimizer.AllSameVisitor; +import org.apache.pig.experimental.logical.optimizer.ProjectionPatcher; +import org.apache.pig.experimental.logical.optimizer.SchemaPatcher; +import org.apache.pig.experimental.logical.relational.LOFilter; +import org.apache.pig.experimental.logical.relational.LOJoin; +import org.apache.pig.experimental.logical.relational.LOLoad; +import org.apache.pig.experimental.logical.relational.LogicalPlan; +import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator; +import org.apache.pig.experimental.logical.relational.LogicalSchema; +import org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE; +import org.apache.pig.experimental.plan.DepthFirstWalker; +import org.apache.pig.experimental.plan.Operator; +import org.apache.pig.experimental.plan.OperatorPlan; +import org.apache.pig.impl.util.MultiMap; +import org.junit.Before; +import org.junit.Test; + +import junit.framework.TestCase; + +/** + * Tests for PlanTransformListerns + * + */ +public class TestExperimentalListener extends TestCase { + + private LogicalPlan lp; + private LogicalPlan changedPlan; + + /* (non-Javadoc) + * @see junit.framework.TestCase#setUp() + */ + @Before + protected void setUp() throws Exception { + super.setUp(); + + // Build a plan that looks like it has just been transformed + // It is roughly the logical plan for + // A = load 'bla' as (x); + // B = load 'morebla' as (y); + // C = join A on x, B on y; + // D = filter C by y > 0; + // The plan is built with the filter having been pushed above the join + // but the listners not yet having been called. + // A = load + lp = new LogicalPlan(); + LogicalSchema aschema = new LogicalSchema(); + aschema.addField(new LogicalSchema.LogicalFieldSchema( + "x", null, DataType.INTEGER)); + aschema.getField(0).uid = 1; + LOLoad A = new LOLoad(null, aschema, lp); + lp.add(A); + + // B = load + LogicalSchema bschema = new LogicalSchema(); + bschema.addField(new LogicalSchema.LogicalFieldSchema( + "y", null, DataType.INTEGER)); + bschema.getField(0).uid = 2; + LOLoad B = new LOLoad(null, bschema, lp); + lp.add(B); + + // C = join + LogicalSchema cschema = new LogicalSchema(); + cschema.addField(new LogicalSchema.LogicalFieldSchema( + "x", null, DataType.INTEGER)); + cschema.addField(new LogicalSchema.LogicalFieldSchema( + "y", null, DataType.INTEGER)); + cschema.getField(0).uid = 1; + cschema.getField(1).uid = 2; + LogicalExpressionPlan aprojplan = new LogicalExpressionPlan(); + ProjectExpression x = new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0); + x.neverUseForRealSetUid(1); + LogicalExpressionPlan bprojplan = new LogicalExpressionPlan(); + ProjectExpression y = new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0); + y.neverUseForRealSetUid(2); + MultiMap mm = + new MultiMap(); + mm.put(0, aprojplan); + mm.put(1, bprojplan); + LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true}); + C.neverUseForRealSetSchema(cschema); + // Don't add it to the plan quite yet + + // D = filter + LogicalExpressionPlan filterPlan = new LogicalExpressionPlan(); + ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1); + fy.neverUseForRealSetUid(2); + ConstantExpression fc = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(0)); + new EqualExpression(filterPlan, fy, fc); + LOFilter D = new LOFilter(lp, filterPlan); + D.neverUseForRealSetSchema(cschema); + // Connect D to B, since the transform has happened. + lp.add(B, D, (LogicalRelationalOperator)null); + + // Now add in C, connected to A and D. + lp.add(new LogicalRelationalOperator[] {A, D}, C, null); + + changedPlan = new LogicalPlan(); + changedPlan.add(D); + changedPlan.add(D, C, (LogicalRelationalOperator)null); + } + + private static class SillySameVisitor extends AllSameVisitor { + StringBuffer buf = new StringBuffer(); + + SillySameVisitor(OperatorPlan plan) { + super(plan, new DepthFirstWalker(plan)); + } + + @Override + protected void execute(LogicalRelationalOperator op) throws IOException { + buf.append(op.getName()); + buf.append(" "); + } + + @Override + public String toString() { + return buf.toString(); + } + + } + + // Test that the AllSameVisitor calls execute on every node + // in the plan. + @Test + public void testAllSameVisitor() throws IOException { + SillySameVisitor v = new SillySameVisitor(lp); + v.visit(); + System.out.println(v.toString()); + assertTrue("LOLoad LOJoin LOLoad LOFilter ".equals(v.toString()) || + "LOLoad LOFilter LOJoin LOLoad ".equals(v.toString())); + + } + + private static class SillyExpressionVisitor extends LogicalExpressionVisitor { + StringBuffer buf; + + protected SillyExpressionVisitor(OperatorPlan p, StringBuffer b) { + super(p, new DepthFirstWalker(p)); + buf = b; + } + + @Override + public void visitAnd(AndExpression andExpr) throws IOException { + buf.append("and "); + } + + @Override + public void visitEqual(EqualExpression equal) throws IOException { + buf.append("equal "); + } + + @Override + public void visitProject(ProjectExpression p) throws IOException { + buf.append("proj "); + } + + @Override + public void visitConstant(ConstantExpression c) throws IOException { + buf.append("const "); + } + } + + private static class SillyAllExpressionVisitor extends AllExpressionVisitor { + StringBuffer buf = new StringBuffer(); + + public SillyAllExpressionVisitor(OperatorPlan plan) { + super(plan, new DepthFirstWalker(plan)); + } + + + @Override + protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) { + return new SillyExpressionVisitor(expr, buf); + } + + @Override + public String toString() { + return buf.toString(); + } + } + + // Test that the AllExpressionVisitor executes on every + // expression in the plan + @Test + public void testAllExpressionVisitor() throws IOException { + SillyAllExpressionVisitor v = new SillyAllExpressionVisitor(lp); + v.visit(); + assertTrue("proj proj equal proj const ".equals(v.toString()) || + "equal proj const proj proj ".equals(v.toString())); + } + + // Test that schemas are patched up after a transform + @Test + public void testSchemaPatcher() throws IOException { + SchemaPatcher patcher = new SchemaPatcher(); + patcher.transformed(lp, changedPlan); + + // Check that the filter now has the proper schema. + List roots = changedPlan.getSources(); + assertEquals(1, roots.size()); + LOFilter D = (LOFilter)roots.get(0); + assertNotNull(D); + LogicalSchema dschema = D.getSchema(); + assertEquals(1, dschema.size()); + LogicalSchema.LogicalFieldSchema y = dschema.getField(0); + assertEquals("y", y.alias); + assertEquals(2, y.uid); + } + + // Test that projections are patched up after a transform + @Test + public void testProjectionPatcher() throws IOException { + ProjectionPatcher patcher = new ProjectionPatcher(); + patcher.transformed(lp, changedPlan); + + // Check that the projections in filter are now set properly + List roots = changedPlan.getSources(); + assertEquals(1, roots.size()); + LOFilter D = (LOFilter)roots.get(0); + assertNotNull(D); + LogicalExpressionPlan filterPlan = D.getFilterPlan(); + List leaves = filterPlan.getSinks(); + assertEquals(2, leaves.size()); + ProjectExpression proj = null; + for (Operator leaf : leaves) { + if (leaf instanceof ProjectExpression) { + proj = (ProjectExpression)leaf; + break; + } + } + assertNotNull(proj); + assertEquals(0, proj.getInputNum()); + assertEquals(0, proj.getColNum()); + } + +} + Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java?rev=909165&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java Thu Feb 11 22:12:36 2010 @@ -0,0 +1,537 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.IOException; +import java.util.List; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.data.DataType; +import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor; +import org.apache.pig.experimental.logical.expression.LogicalExpression; +import org.apache.pig.experimental.logical.optimizer.UidStamper; +import org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor; +import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator; +import org.apache.pig.experimental.logical.relational.LogicalSchema; +import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema; +import org.apache.pig.experimental.plan.Operator; +import org.apache.pig.experimental.plan.OperatorPlan; +import org.apache.pig.experimental.plan.PlanVisitor; +import org.apache.pig.impl.logicalLayer.LogicalPlan; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.test.utils.LogicalPlanTester; + +import junit.framework.TestCase; + +public class TestExperimentalLogToPhyTranslationVisitor extends TestCase { + + private PhysicalPlan translatePlan(OperatorPlan plan) throws IOException { + LogToPhyTranslationVisitor visitor = new LogToPhyTranslationVisitor(plan); + visitor.visit(); + return visitor.getPhysicalPlan(); + } + + private org.apache.pig.experimental.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{ + LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp); + visitor.visit(); + org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan(); + + try { + UidStamper stamper = new UidStamper(newPlan); + stamper.visit(); + + return newPlan; + }catch(Exception e) { + throw new VisitorException(e); + } + } + + protected void setUp() throws Exception { + LogicalExpression.resetNextUid(); + } + + public void testSimplePlan() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt';"); + lpt.buildPlan("b = filter a by $0==NULL;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + assertEquals( 3, phyPlan.size() ); + assertEquals( 1, phyPlan.getRoots().size() ); + assertEquals( 1, phyPlan.getLeaves().size() ); + + PhysicalOperator load = phyPlan.getRoots().get(0); + assertEquals( POLoad.class, load.getClass() ); + assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt") ); + + // Check for Filter + PhysicalOperator fil = phyPlan.getSuccessors(load).get(0); + assertEquals( POFilter.class, fil.getClass() ); + PhysicalPlan filPlan = ((POFilter)fil).getPlan(); + assertEquals( 2, filPlan.getRoots().size() ); + assertEquals( 1, filPlan.getLeaves().size() ); + + PhysicalOperator eq = filPlan.getLeaves().get(0); + assertEquals( EqualToExpr.class, eq.getClass() ); + + PhysicalOperator prj1 = filPlan.getRoots().get(0); + assertEquals( POProject.class, prj1.getClass() ); + assertEquals( 0, ((POProject)prj1).getColumn() ); + PhysicalOperator constExp = filPlan.getRoots().get(1); + assertEquals( ConstantExpression.class, constExp.getClass() ); + assertEquals( null, ((ConstantExpression)constExp).getValue() ); + + // Check for Store + PhysicalOperator stor = phyPlan.getSuccessors(fil).get(0); + assertEquals( POStore.class, stor.getClass() ); + assertTrue( ((POStore)stor).getSFile().getFileName().contains("empty")); + } + + public void testJoinPlan() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd1.txt' as (id, c);"); + lpt.buildPlan("b = load 'd2.txt'as (id, c);"); + lpt.buildPlan("c = join a by id, b by c;"); + lpt.buildPlan("d = filter c by a::id==NULL AND b::c==NULL;"); + LogicalPlan plan = lpt.buildPlan("store d into 'empty';"); + + // check basics + org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan); + PhysicalPlan physicalPlan = translatePlan(newPlan); + assertEquals(9, physicalPlan.size()); + assertEquals(physicalPlan.getRoots().size(), 2); + + // Check Load and LocalRearrange and GlobalRearrange + PhysicalOperator LoR = (PhysicalOperator)physicalPlan.getSuccessors(physicalPlan.getRoots().get(0)).get(0); + assertEquals( POLocalRearrange.class, LoR.getClass() ); + POLocalRearrange Lor = (POLocalRearrange) LoR; + PhysicalOperator prj3 = Lor.getPlans().get(0).getLeaves().get(0); + assertEquals( POProject.class, prj3.getClass() ); + assertEquals(0, ((POProject)prj3).getColumn() ); + PhysicalOperator inp1 = Lor.getInputs().get(0); + assertEquals( POLoad.class, inp1.getClass() ); + assertTrue( ((POLoad)inp1).getLFile().getFileName().contains("d1.txt") ); + + PhysicalOperator LoR1 = (PhysicalOperator)physicalPlan.getSuccessors(physicalPlan.getRoots().get(1)).get(0); + assertEquals( POLocalRearrange.class, LoR1.getClass() ); + POLocalRearrange Lor1 = (POLocalRearrange) LoR1; + PhysicalOperator prj4 = Lor1.getPlans().get(0).getLeaves().get(0); + assertEquals( POProject.class, prj4.getClass() ); + assertEquals(1, ((POProject)prj4).getColumn() ); + PhysicalOperator inp2 = Lor1.getInputs().get(0); + assertEquals( POLoad.class, inp2.getClass() ); + assertTrue( ((POLoad)inp2).getLFile().getFileName().contains("d2.txt") ); + + PhysicalOperator GoR = (PhysicalOperator)physicalPlan.getSuccessors(LoR).get(0); + assertEquals( POGlobalRearrange.class, GoR.getClass() ); + + PhysicalOperator Pack = (PhysicalOperator)physicalPlan.getSuccessors(GoR).get(0); + assertEquals( POPackage.class, Pack.getClass() ); + + // Check for ForEach + PhysicalOperator ForE = (PhysicalOperator)physicalPlan.getSuccessors(Pack).get(0); + assertEquals( POForEach.class, ForE.getClass() ); + PhysicalOperator prj5 = ((POForEach)ForE).getInputPlans().get(0).getLeaves().get(0); + assertEquals( POProject.class, prj5.getClass() ); + assertEquals( 1, ((POProject)prj5).getColumn() ); + PhysicalOperator prj6 = ((POForEach)ForE).getInputPlans().get(1).getLeaves().get(0); + assertEquals( POProject.class, prj6.getClass() ); + assertEquals( 2, ((POProject)prj6).getColumn() ); + + // Filter Operator + PhysicalOperator fil = (PhysicalOperator)physicalPlan.getSuccessors(ForE).get(0); + assertEquals( POFilter.class, fil.getClass() ); + + PhysicalPlan filPlan = ((POFilter)fil).getPlan(); + List filRoots = filPlan.getRoots(); + + assertEquals( ConstantExpression.class, filRoots.get(1).getClass() ); + ConstantExpression ce1 = (ConstantExpression) filRoots.get(1); + assertEquals( null, ce1.getValue() ); + assertEquals( ConstantExpression.class, filRoots.get(3).getClass() ); + ConstantExpression ce2 = (ConstantExpression) filRoots.get(3); + assertEquals( null, ce2.getValue() ); + assertEquals( POProject.class, filRoots.get(0).getClass() ); + POProject prj1 = (POProject) filRoots.get(0); + assertEquals( 3, prj1.getColumn() ); + assertEquals( POProject.class, filRoots.get(2).getClass() ); + POProject prj2 = (POProject) filRoots.get(2); + assertEquals( 0, prj2.getColumn() ); + + + // Check Store Operator + PhysicalOperator stor = (PhysicalOperator)physicalPlan.getSuccessors(fil).get(0); + assertEquals( POStore.class, stor.getClass() ); + assertTrue( ((POStore)stor).getSFile().getFileName().contains("empty") ); + } + + public void testMultiStore() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd1.txt' as (id, c);"); + lpt.buildPlan("b = load 'd2.txt'as (id, c);"); + lpt.buildPlan("c = load 'd3.txt' as (id, c);"); + lpt.buildPlan("d = join a by id, b by c;"); + lpt.buildPlan("e = filter d by a::id==NULL AND b::c==NULL;"); + lpt.buildPlan("f = join e by b::c, c by id;"); + lpt.buildPlan("g = filter f by b::id==NULL AND c::c==NULL;"); + LogicalPlan plan = lpt.buildPlan("store g into 'empty2';"); + + // check basics + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + assertEquals(16, phyPlan.size()); + assertEquals(phyPlan.getRoots().size(), 3); + assertEquals(phyPlan.getLeaves().size(), 1 ); + + // Check Load and LocalRearrange and GlobalRearrange + PhysicalOperator LoR = (PhysicalOperator)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0); + assertEquals( POLocalRearrange.class, LoR.getClass() ); + POLocalRearrange Lor = (POLocalRearrange) LoR; + PhysicalOperator prj1 = Lor.getPlans().get(0).getLeaves().get(0); + assertEquals( POProject.class, prj1.getClass() ); + assertEquals(0, ((POProject)prj1).getColumn() ); + PhysicalOperator inp1 = Lor.getInputs().get(0); + assertEquals( POLoad.class, inp1.getClass() ); + assertTrue( ((POLoad)inp1).getLFile().getFileName().contains("d3.txt") ); + + PhysicalOperator LoR1 = (PhysicalOperator)phyPlan.getSuccessors(phyPlan.getRoots().get(1)).get(0); + assertEquals( POLocalRearrange.class, LoR1.getClass() ); + POLocalRearrange Lor1 = (POLocalRearrange) LoR1; + PhysicalOperator prj2 = Lor1.getPlans().get(0).getLeaves().get(0); + assertEquals( POProject.class, prj2.getClass() ); + assertEquals(1, ((POProject)prj2).getColumn() ); + PhysicalOperator inp2 = Lor1.getInputs().get(0); + assertEquals( POLoad.class, inp2.getClass() ); + assertTrue( ((POLoad)inp2).getLFile().getFileName().contains("d2.txt") ); + + PhysicalOperator GoR = (PhysicalOperator)phyPlan.getSuccessors(LoR).get(0); + assertEquals( POGlobalRearrange.class, GoR.getClass() ); + + PhysicalOperator Pack = (PhysicalOperator)phyPlan.getSuccessors(GoR).get(0); + assertEquals( POPackage.class, Pack.getClass() ); + + PhysicalOperator LoR2 = (PhysicalOperator)phyPlan.getSuccessors(phyPlan.getRoots().get(2)).get(0); + assertEquals( POLocalRearrange.class, LoR2.getClass() ); + POLocalRearrange Lor2 = (POLocalRearrange) LoR2; + PhysicalOperator prj3 = Lor2.getPlans().get(0).getLeaves().get(0); + assertEquals( POProject.class, prj3.getClass() ); + assertEquals(0, ((POProject)prj3).getColumn() ); + PhysicalOperator inp3 = Lor2.getInputs().get(0); + assertEquals( POLoad.class, inp3.getClass() ); + assertTrue( ((POLoad)inp3).getLFile().getFileName().contains("d1.txt") ); + + PhysicalOperator GoR2 = (PhysicalOperator)phyPlan.getSuccessors(LoR2).get(0); + assertEquals( POGlobalRearrange.class, GoR2.getClass() ); + + PhysicalOperator Pack2 = (PhysicalOperator)phyPlan.getSuccessors(GoR2).get(0); + assertEquals( POPackage.class, Pack2.getClass() ); + + // Check for ForEach + PhysicalOperator ForE = (PhysicalOperator)phyPlan.getSuccessors(Pack).get(0); + assertEquals( POForEach.class, ForE.getClass() ); + PhysicalOperator prj4 = ((POForEach)ForE).getInputPlans().get(0).getLeaves().get(0); + assertEquals( POProject.class, prj4.getClass() ); + assertEquals( 1, ((POProject)prj4).getColumn() ); + PhysicalOperator prj5 = ((POForEach)ForE).getInputPlans().get(1).getLeaves().get(0); + assertEquals( POProject.class, prj5.getClass() ); + assertEquals( 2, ((POProject)prj5).getColumn() ); + + PhysicalOperator ForE2 = (PhysicalOperator)phyPlan.getSuccessors(Pack2).get(0); + assertEquals( POForEach.class, ForE2.getClass() ); + PhysicalOperator prj6 = ((POForEach)ForE2).getInputPlans().get(0).getLeaves().get(0); + assertEquals( POProject.class, prj6.getClass() ); + assertEquals( 1, ((POProject)prj6).getColumn() ); + PhysicalOperator prj7 = ((POForEach)ForE2).getInputPlans().get(1).getLeaves().get(0); + assertEquals( POProject.class, prj7.getClass() ); + assertEquals( 2, ((POProject)prj7).getColumn() ); + + // Check Filter Operator + PhysicalOperator fil = (PhysicalOperator)phyPlan.getSuccessors(ForE).get(0); + assertEquals( POFilter.class, fil.getClass() ); + + PhysicalPlan filPlan = ((POFilter)fil).getPlan(); + List filRoots = filPlan.getRoots(); + + assertEquals( ConstantExpression.class, filRoots.get(0).getClass() ); + ConstantExpression ce1 = (ConstantExpression) filRoots.get(0); + assertEquals( null, ce1.getValue() ); + assertEquals( ConstantExpression.class, filRoots.get(2).getClass() ); + ConstantExpression ce2 = (ConstantExpression) filRoots.get(2); + assertEquals( null, ce2.getValue() ); + assertEquals( POProject.class, filRoots.get(1).getClass() ); + POProject prj8 = (POProject) filRoots.get(1); + assertEquals( 5, prj8.getColumn() ); + assertEquals( POProject.class, filRoots.get(3).getClass() ); + POProject prj9 = (POProject) filRoots.get(3); + assertEquals( 2, prj9.getColumn() ); + + + PhysicalOperator fil2 = (PhysicalOperator)phyPlan.getSuccessors(ForE2).get(0); + assertEquals( POFilter.class, fil2.getClass() ); + + PhysicalOperator LoR3 = (PhysicalOperator)phyPlan.getSuccessors(fil2).get(0); + assertEquals( POLocalRearrange.class, LoR3.getClass() ); + POLocalRearrange Lor3 = (POLocalRearrange) LoR3; + PhysicalOperator prj12 = Lor3.getPlans().get(0).getLeaves().get(0); + assertEquals( POProject.class, prj12.getClass() ); + assertEquals(3, ((POProject)prj12).getColumn() ); + + PhysicalPlan filPlan2 = ((POFilter)fil2).getPlan(); + List filRoots2 = filPlan2.getRoots(); + + assertEquals( ConstantExpression.class, filRoots2.get(0).getClass() ); + ConstantExpression ce3 = (ConstantExpression) filRoots2.get(0); + assertEquals( null, ce3.getValue() ); + assertEquals( ConstantExpression.class, filRoots2.get(2).getClass() ); + ConstantExpression ce4 = (ConstantExpression) filRoots2.get(2); + assertEquals( null, ce4.getValue() ); + assertEquals( POProject.class, filRoots2.get(1).getClass() ); + POProject prj10 = (POProject) filRoots2.get(1); + assertEquals( 3, prj10.getColumn() ); + assertEquals( POProject.class, filRoots2.get(3).getClass() ); + POProject prj11 = (POProject) filRoots2.get(3); + assertEquals( 0, prj11.getColumn() ); + + // Check Store Operator + PhysicalOperator stor = (PhysicalOperator)phyPlan.getLeaves().get(0); + assertEquals( stor, phyPlan.getSuccessors(fil).get(0)); + assertEquals( POStore.class, stor.getClass() ); + assertTrue( ((POStore)stor).getSFile().getFileName().contains("empty") ); + } + + public void testPlanWithCast() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (id, c);"); + lpt.buildPlan("b = filter a by (int)id==10;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + // check basics + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + assertEquals(3, phyPlan.size()); + assertEquals(phyPlan.getRoots().size(), 1); + assertEquals(phyPlan.getLeaves().size(), 1 ); + + PhysicalOperator load = phyPlan.getRoots().get(0); + assertEquals( POLoad.class, load.getClass() ); + assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt")); + + PhysicalOperator fil = phyPlan.getSuccessors(load).get(0); + assertEquals( POFilter.class, fil.getClass() ); + PhysicalPlan filPlan = ((POFilter)fil).getPlan(); + + PhysicalOperator equal = filPlan.getLeaves().get(0); + assertEquals( EqualToExpr.class, equal.getClass() ); + assertEquals( DataType.BOOLEAN, ((EqualToExpr)equal).getResultType() ); + + PhysicalOperator constExpr = ((EqualToExpr)equal).getRhs(); + assertEquals( ConstantExpression.class, constExpr.getClass() ); + assertEquals( 10, ((ConstantExpression)constExpr).getValue() ); + + PhysicalOperator castExpr = ((EqualToExpr)equal).getLhs(); + assertEquals( POCast.class, castExpr.getClass() ); + assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() ); + + PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0); + assertEquals( POProject.class, prj.getClass() ); + assertEquals( 0, ((POProject)prj).getColumn() ); + + PhysicalOperator stor = phyPlan.getLeaves().get(0); + assertEquals( POStore.class, stor.getClass() ); + assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) ); + } + + public void testPlanWithGreaterThan() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (id, c);"); + lpt.buildPlan("b = filter a by (int)id>10;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + // check basics + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + assertEquals(3, phyPlan.size()); + assertEquals(phyPlan.getRoots().size(), 1); + assertEquals(phyPlan.getLeaves().size(), 1 ); + + PhysicalOperator load = phyPlan.getRoots().get(0); + assertEquals( POLoad.class, load.getClass() ); + assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt")); + + PhysicalOperator fil = phyPlan.getSuccessors(load).get(0); + assertEquals( POFilter.class, fil.getClass() ); + PhysicalPlan filPlan = ((POFilter)fil).getPlan(); + + PhysicalOperator greaterThan = filPlan.getLeaves().get(0); + assertEquals( GreaterThanExpr.class, greaterThan.getClass() ); + assertEquals( DataType.BOOLEAN, ((GreaterThanExpr)greaterThan).getResultType() ); + + PhysicalOperator constExpr = ((GreaterThanExpr)greaterThan).getRhs(); + assertEquals( ConstantExpression.class, constExpr.getClass() ); + assertEquals( 10, ((ConstantExpression)constExpr).getValue() ); + + PhysicalOperator castExpr = ((GreaterThanExpr)greaterThan).getLhs(); + assertEquals( POCast.class, castExpr.getClass() ); + assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() ); + + PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0); + assertEquals( POProject.class, prj.getClass() ); + assertEquals( 0, ((POProject)prj).getColumn() ); + + PhysicalOperator stor = phyPlan.getLeaves().get(0); + assertEquals( POStore.class, stor.getClass() ); + assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) ); + } + + public void testPlanWithLessThan() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (id, c);"); + lpt.buildPlan("b = filter a by (int)id<10;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + // check basics + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + assertEquals(3, phyPlan.size()); + assertEquals(phyPlan.getRoots().size(), 1); + assertEquals(phyPlan.getLeaves().size(), 1 ); + + PhysicalOperator load = phyPlan.getRoots().get(0); + assertEquals( POLoad.class, load.getClass() ); + assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt")); + + PhysicalOperator fil = phyPlan.getSuccessors(load).get(0); + assertEquals( POFilter.class, fil.getClass() ); + PhysicalPlan filPlan = ((POFilter)fil).getPlan(); + + PhysicalOperator lessThan = filPlan.getLeaves().get(0); + assertEquals( LessThanExpr.class, lessThan.getClass() ); + assertEquals( DataType.BOOLEAN, ((LessThanExpr)lessThan).getResultType() ); + + PhysicalOperator constExpr = ((LessThanExpr)lessThan).getRhs(); + assertEquals( ConstantExpression.class, constExpr.getClass() ); + assertEquals( 10, ((ConstantExpression)constExpr).getValue() ); + + PhysicalOperator castExpr = ((LessThanExpr)lessThan).getLhs(); + assertEquals( POCast.class, castExpr.getClass() ); + assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() ); + + PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0); + assertEquals( POProject.class, prj.getClass() ); + assertEquals( 0, ((POProject)prj).getColumn() ); + + PhysicalOperator stor = phyPlan.getLeaves().get(0); + assertEquals( POStore.class, stor.getClass() ); + assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) ); + } + + public void testForeachPlan() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (id, c);"); + lpt.buildPlan("b = foreach a generate id, c;"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + assertEquals(phyPlan.size(), 3); + POLoad load = (POLoad)phyPlan.getRoots().get(0); + assertEquals(phyPlan.getLeaves().get(0).getClass(), POStore.class); + POForEach foreach = (POForEach)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0); + + assertEquals(foreach.getInputPlans().size(), 2); + + PhysicalPlan inner = foreach.getInputPlans().get(0); + assertEquals(inner.size(), 1); + POProject prj = (POProject)inner.getRoots().get(0); + assertEquals(prj.getColumn(), 0); + assertEquals(prj.getInputs().get(0), load); + + inner = foreach.getInputPlans().get(1); + assertEquals(inner.size(), 1); + prj = (POProject)inner.getRoots().get(0); + assertEquals(prj.getColumn(), 1); + assertEquals(prj.getInputs().get(0), load); + Boolean[] flat = foreach.getToBeFlattened().toArray(new Boolean[0]); + assertFalse(flat[0]); + assertFalse(flat[1]); + } + + public void testForeachPlan2() throws Exception { + LogicalPlanTester lpt = new LogicalPlanTester(); + lpt.buildPlan("a = load 'd.txt' as (id, c:bag{t:(s,v)});"); + lpt.buildPlan("b = foreach a generate id, flatten(c);"); + LogicalPlan plan = lpt.buildPlan("store b into 'empty';"); + + org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan); + LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0); + LogicalRelationalOperator fe = (LogicalRelationalOperator)newLogicalPlan.getSuccessors(ld).get(0); + LogicalSchema ls = fe.getSchema(); + assertEquals(1, ls.getField(0).uid); + assertEquals(4, ls.getField(1).uid); + assertEquals(5, ls.getField(2).uid); + + LogicalSchema expected = new LogicalSchema(); + expected.addField(new LogicalFieldSchema("id", null, DataType.BYTEARRAY)); + expected.addField(new LogicalFieldSchema("s", null, DataType.BYTEARRAY)); + expected.addField(new LogicalFieldSchema("v", null, DataType.BYTEARRAY)); + assertTrue(expected.isEqual(ls)); + + + PhysicalPlan phyPlan = translatePlan(newLogicalPlan); + + assertEquals(phyPlan.size(), 3); + POLoad load = (POLoad)phyPlan.getRoots().get(0); + assertEquals(phyPlan.getLeaves().get(0).getClass(), POStore.class); + POForEach foreach = (POForEach)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0); + + assertEquals(foreach.getInputPlans().size(), 2); + + PhysicalPlan inner = foreach.getInputPlans().get(0); + assertEquals(inner.size(), 1); + POProject prj = (POProject)inner.getRoots().get(0); + assertEquals(prj.getColumn(), 0); + assertEquals(prj.getInputs().get(0), load); + + inner = foreach.getInputPlans().get(1); + assertEquals(inner.size(), 1); + prj = (POProject)inner.getRoots().get(0); + assertEquals(prj.getColumn(), 1); + assertEquals(prj.getInputs().get(0), load); + Boolean[] flat = foreach.getToBeFlattened().toArray(new Boolean[0]); + assertFalse(flat[0]); + assertTrue(flat[1]); + } + +} Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java?rev=909165&view=auto ============================================================================== --- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java (added) +++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java Thu Feb 11 22:12:36 2010 @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.test; + +import java.io.IOException; + +import org.apache.pig.FuncSpec; +import org.apache.pig.data.DataType; +import org.apache.pig.experimental.logical.expression.AndExpression; +import org.apache.pig.experimental.logical.expression.ConstantExpression; +import org.apache.pig.experimental.logical.expression.EqualExpression; +import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan; +import org.apache.pig.experimental.logical.expression.ProjectExpression; +import org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer; +import org.apache.pig.experimental.logical.relational.LOFilter; +import org.apache.pig.experimental.logical.relational.LOJoin; +import org.apache.pig.experimental.logical.relational.LOLoad; +import org.apache.pig.experimental.logical.relational.LogicalPlan; +import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator; +import org.apache.pig.experimental.logical.relational.LogicalSchema; +import org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE; +import org.apache.pig.impl.io.FileSpec; +import org.apache.pig.impl.util.MultiMap; +import org.junit.Test; + +import junit.framework.TestCase; + +/** + * Test end to end logical optimizations. + */ +public class TestExperimentalLogicalOptimizer extends TestCase { + + @Test + public void testFilterPushDown() throws IOException { + // A logical plan for: + // A = load 'bla' as (x, y); + // B = load 'morebla' as (a, b); + // C = join A on x, B on a; + // D = filter C by x = a and x = 0 and b = 1 and y = b; + // store D into 'whatever'; + + // A = load + LogicalPlan lp = new LogicalPlan(); + { + LogicalSchema aschema = new LogicalSchema(); + aschema.addField(new LogicalSchema.LogicalFieldSchema( + "x", null, DataType.INTEGER)); + aschema.addField(new LogicalSchema.LogicalFieldSchema( + "y", null, DataType.INTEGER)); + aschema.getField(0).uid = 1; + aschema.getField(1).uid = 2; + LOLoad A = new LOLoad(new FileSpec("bla", new FuncSpec("PigStorage", "\t")), aschema, lp); + lp.add(A); + + // B = load + LogicalSchema bschema = new LogicalSchema(); + bschema.addField(new LogicalSchema.LogicalFieldSchema( + "a", null, DataType.INTEGER)); + bschema.addField(new LogicalSchema.LogicalFieldSchema( + "b", null, DataType.INTEGER)); + bschema.getField(0).uid = 3; + bschema.getField(1).uid = 4; + LOLoad B = new LOLoad(null, bschema, lp); + lp.add(B); + + // C = join + LogicalSchema cschema = new LogicalSchema(); + cschema.addField(new LogicalSchema.LogicalFieldSchema( + "x", null, DataType.INTEGER)); + cschema.addField(new LogicalSchema.LogicalFieldSchema( + "y", null, DataType.INTEGER)); + cschema.addField(new LogicalSchema.LogicalFieldSchema( + "a", null, DataType.INTEGER)); + cschema.addField(new LogicalSchema.LogicalFieldSchema( + "b", null, DataType.INTEGER)); + cschema.getField(0).uid = 1; + cschema.getField(1).uid = 2; + cschema.getField(2).uid = 3; + cschema.getField(3).uid = 4; + LogicalExpressionPlan aprojplan = new LogicalExpressionPlan(); + ProjectExpression x = new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0); + x.neverUseForRealSetUid(1); + LogicalExpressionPlan bprojplan = new LogicalExpressionPlan(); + ProjectExpression y = new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0); + y.neverUseForRealSetUid(3); + MultiMap mm = + new MultiMap(); + mm.put(0, aprojplan); + mm.put(1, bprojplan); + LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true}); + C.neverUseForRealSetSchema(cschema); + lp.add(new LogicalRelationalOperator[] {A, B}, C, null); + + // D = filter + LogicalExpressionPlan filterPlan = new LogicalExpressionPlan(); + ProjectExpression fx = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 0); + fx.neverUseForRealSetUid(1); + ConstantExpression fc0 = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(0)); + EqualExpression eq1 = new EqualExpression(filterPlan, fx, fc0); + ProjectExpression fanotherx = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 0); + fanotherx.neverUseForRealSetUid(1); + ProjectExpression fa = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 2); + fa.neverUseForRealSetUid(3); + EqualExpression eq2 = new EqualExpression(filterPlan, fanotherx, fa); + AndExpression and1 = new AndExpression(filterPlan, eq1, eq2); + ProjectExpression fb = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 3); + fb.neverUseForRealSetUid(4); + ConstantExpression fc1 = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(1)); + EqualExpression eq3 = new EqualExpression(filterPlan, fb, fc1); + AndExpression and2 = new AndExpression(filterPlan, and1, eq3); + ProjectExpression fanotherb = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 3); + fanotherb.neverUseForRealSetUid(4); + ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1); + fy.neverUseForRealSetUid(2); + EqualExpression eq4 = new EqualExpression(filterPlan, fy, fanotherb); + new AndExpression(filterPlan, and2, eq4); + + LOFilter D = new LOFilter(lp, filterPlan); + D.neverUseForRealSetSchema(cschema); + // Connect D to B, since the transform has happened. + lp.add(C, D, (LogicalRelationalOperator)null); + } + + LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(lp, 500); + optimizer.optimize(); + + LogicalPlan expected = new LogicalPlan(); + { + // A = load + LogicalSchema aschema = new LogicalSchema(); + aschema.addField(new LogicalSchema.LogicalFieldSchema( + "x", null, DataType.INTEGER)); + aschema.addField(new LogicalSchema.LogicalFieldSchema( + "y", null, DataType.INTEGER)); + aschema.getField(0).uid = 1; + aschema.getField(1).uid = 2; + LOLoad A = new LOLoad(new FileSpec("bla", new FuncSpec("PigStorage", "\t")), aschema, expected); + expected.add(A); + + // DA = filter + LogicalExpressionPlan DAfilterPlan = new LogicalExpressionPlan(); + ProjectExpression fx = new ProjectExpression(DAfilterPlan, DataType.INTEGER, 0, 0); + fx.neverUseForRealSetUid(1); + ConstantExpression fc0 = new ConstantExpression(DAfilterPlan, DataType.INTEGER, new Integer(0)); + new EqualExpression(DAfilterPlan, fx, fc0); + + LOFilter DA = new LOFilter(expected, DAfilterPlan); + DA.neverUseForRealSetSchema(aschema); + expected.add(A, DA, (LogicalRelationalOperator)null); + + // B = load + LogicalSchema bschema = new LogicalSchema(); + bschema.addField(new LogicalSchema.LogicalFieldSchema( + "a", null, DataType.INTEGER)); + bschema.addField(new LogicalSchema.LogicalFieldSchema( + "b", null, DataType.INTEGER)); + bschema.getField(0).uid = 3; + bschema.getField(1).uid = 4; + LOLoad B = new LOLoad(null, bschema, expected); + expected.add(B); + + // DB = filter + LogicalExpressionPlan DBfilterPlan = new LogicalExpressionPlan(); + ProjectExpression fb = new ProjectExpression(DBfilterPlan, DataType.INTEGER, 0, 1); + fb.neverUseForRealSetUid(4); + ConstantExpression fc1 = new ConstantExpression(DBfilterPlan, DataType.INTEGER, new Integer(1)); + new EqualExpression(DBfilterPlan, fb, fc1); + + LOFilter DB = new LOFilter(expected, DBfilterPlan); + DB.neverUseForRealSetSchema(bschema); + expected.add(B, DB, (LogicalRelationalOperator)null); + + // C = join + LogicalSchema cschema = new LogicalSchema(); + cschema.addField(new LogicalSchema.LogicalFieldSchema( + "x", null, DataType.INTEGER)); + cschema.addField(new LogicalSchema.LogicalFieldSchema( + "y", null, DataType.INTEGER)); + cschema.addField(new LogicalSchema.LogicalFieldSchema( + "a", null, DataType.INTEGER)); + cschema.addField(new LogicalSchema.LogicalFieldSchema( + "b", null, DataType.INTEGER)); + cschema.getField(0).uid = 1; + cschema.getField(1).uid = 2; + cschema.getField(2).uid = 3; + cschema.getField(3).uid = 4; + LogicalExpressionPlan aprojplan = new LogicalExpressionPlan(); + ProjectExpression x = new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0); + x.neverUseForRealSetUid(1); + LogicalExpressionPlan bprojplan = new LogicalExpressionPlan(); + ProjectExpression y = new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0); + y.neverUseForRealSetUid(3); + MultiMap mm = + new MultiMap(); + mm.put(0, aprojplan); + mm.put(1, bprojplan); + LOJoin C = new LOJoin(expected, mm, JOINTYPE.HASH, new boolean[] {true, true}); + C.neverUseForRealSetSchema(cschema); + expected.add(new LogicalRelationalOperator[] {DA, DB}, C, null); + + // D = filter + LogicalExpressionPlan filterPlan = new LogicalExpressionPlan(); + ProjectExpression fanotherx = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 0); + fanotherx.neverUseForRealSetUid(1); + ProjectExpression fa = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 2); + fa.neverUseForRealSetUid(3); + EqualExpression eq2 = new EqualExpression(filterPlan, fanotherx, fa); + ProjectExpression fanotherb = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 3); + fanotherb.neverUseForRealSetUid(4); + ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1); + fy.neverUseForRealSetUid(2); + EqualExpression eq4 = new EqualExpression(filterPlan, fy, fanotherb); + new AndExpression(filterPlan, eq2, eq4); + + LOFilter D = new LOFilter(expected, filterPlan); + D.neverUseForRealSetSchema(cschema); + expected.add(C, D, (LogicalRelationalOperator)null); + } + + assertTrue( lp.isEqual(expected) ); + // assertEquals(lp, expected); + } + +}