Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 6625C10858 for ; Fri, 18 Apr 2014 09:19:50 +0000 (UTC) Received: (qmail 76737 invoked by uid 500); 18 Apr 2014 09:19:31 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 76484 invoked by uid 500); 18 Apr 2014 09:19:25 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 76424 invoked by uid 99); 18 Apr 2014 09:19:24 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Apr 2014 09:19:24 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C148F9811DA; Fri, 18 Apr 2014 09:19:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hyunsik@apache.org To: commits@tajo.apache.org Date: Fri, 18 Apr 2014 09:19:27 -0000 Message-Id: In-Reply-To: <9171a3b9a1e24dccae6e386c4cc8efec@git.apache.org> References: <9171a3b9a1e24dccae6e386c4cc8efec@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/51] [partial] TAJO-752: Escalate sub modules in tajo-core into the top-level modules. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java new file mode 100644 index 0000000..b56ab47 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java @@ -0,0 +1,400 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.LogicalOptimizer; +import org.apache.tajo.engine.planner.LogicalPlan; +import org.apache.tajo.engine.planner.LogicalPlanner; +import org.apache.tajo.engine.planner.PlanningException; +import org.apache.tajo.engine.planner.logical.GroupbyNode; +import org.apache.tajo.engine.planner.logical.JoinNode; +import org.apache.tajo.engine.planner.logical.LogicalNode; +import org.apache.tajo.engine.planner.logical.NodeType; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.storage.*; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collection; + +import static junit.framework.Assert.assertNotNull; +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestBroadcastJoinPlan { + private TajoConf conf; + private final String TEST_PATH = "target/test-data/TestBroadcastJoinPlan"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private Path testDir; + + private TableDesc smallTable1; + private TableDesc smallTable2; + private TableDesc smallTable3; + private TableDesc largeTable1; + private TableDesc largeTable2; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + conf = util.getConfiguration(); + conf.setLongVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_THRESHOLD, 500 * 1024); + conf.setBoolVar(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO, true); + + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog = util.startCatalogCluster().getCatalog(); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + util.getMiniCatalogCluster().getCatalogServer().reloadBuiltinFunctions(TajoMaster.initBuiltinFunctions()); + + Schema smallTable1Schema = new Schema(); + smallTable1Schema.addColumn("small1_id", TajoDataTypes.Type.INT4); + smallTable1Schema.addColumn("small1_contents", TajoDataTypes.Type.TEXT); + smallTable1 = makeTestData("default.small1", smallTable1Schema, 10 * 1024); + + Schema smallTable2Schema = new Schema(); + smallTable2Schema.addColumn("small2_id", TajoDataTypes.Type.INT4); + smallTable2Schema.addColumn("small2_contents", TajoDataTypes.Type.TEXT); + smallTable2 = makeTestData("default.small2", smallTable2Schema, 10 * 1024); + + Schema smallTable3Schema = new Schema(); + smallTable3Schema.addColumn("small3_id", TajoDataTypes.Type.INT4); + smallTable3Schema.addColumn("small3_contents", TajoDataTypes.Type.TEXT); + smallTable3 = makeTestData("default.small3", smallTable3Schema, 10 * 1024); + + Schema largeTable1Schema = new Schema(); + largeTable1Schema.addColumn("large1_id", TajoDataTypes.Type.INT4); + largeTable1Schema.addColumn("large1_contents", TajoDataTypes.Type.TEXT); + largeTable1 = makeTestData("default.large1", largeTable1Schema, 1024 * 1024); //1M + + Schema largeTable2Schema = new Schema(); + largeTable2Schema.addColumn("large2_id", TajoDataTypes.Type.INT4); + largeTable2Schema.addColumn("large2_contents", TajoDataTypes.Type.TEXT); + largeTable2 = makeTestData("default.large2", largeTable2Schema, 1024 * 1024); //1M + + catalog.createTable(smallTable1); + catalog.createTable(smallTable2); + catalog.createTable(largeTable1); + catalog.createTable(largeTable2); + + analyzer = new SQLAnalyzer(); + } + + private TableDesc makeTestData(String tableName, Schema schema, int dataSize) throws Exception { + TableMeta tableMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV); + Path dataPath = new Path(testDir, tableName + ".csv"); + + String contentsData = ""; + for (int i = 0; i < 1000; i++) { + for (int j = 0; j < 10; j++) { + contentsData += j; + } + } + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(tableMeta, schema, + dataPath); + appender.init(); + Tuple tuple = new VTuple(schema.size()); + int writtenSize = 0; + int count = 0; + while (true) { + TextDatum textDatum = DatumFactory.createText(count + "_" + contentsData); + tuple.put(new Datum[] { + DatumFactory.createInt4(count), textDatum }); + appender.addTuple(tuple); + + writtenSize += textDatum.size(); + if (writtenSize >= dataSize) { + break; + } + } + + appender.flush(); + appender.close(); + + TableDesc tableDesc = CatalogUtil.newTableDesc(tableName, schema, tableMeta, dataPath); + TableStats tableStats = new TableStats(); + FileSystem fs = dataPath.getFileSystem(conf); + tableStats.setNumBytes(fs.getFileStatus(dataPath).getLen()); + + tableDesc.setStats(tableStats); + + return tableDesc; + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + @Test + public final void testBroadcastJoin() throws IOException, PlanningException { + String query = "select count(*) from large1 " + + "join small1 on large1_id = small1_id " + + "join small2 on small1_id = small2_id"; + + LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(conf); + Expr expr = analyzer.parse(query); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); + + /* + |-eb_1395714781593_0000_000007 (TERMINAL) + |-eb_1395714781593_0000_000006 (ROOT) + |-eb_1395714781593_0000_000005 (LEAF) + */ + + ExecutionBlock terminalEB = masterPlan.getRoot(); + assertEquals(1, masterPlan.getChildCount(terminalEB.getId())); + + ExecutionBlock rootEB = masterPlan.getChild(terminalEB.getId(), 0); + assertEquals(1, masterPlan.getChildCount(rootEB.getId())); + + ExecutionBlock leafEB = masterPlan.getChild(rootEB.getId(), 0); + assertNotNull(leafEB); + + assertEquals(0, masterPlan.getChildCount(leafEB.getId())); + Collection broadcastTables = leafEB.getBroadcastTables(); + assertEquals(2, broadcastTables.size()); + + assertTrue(broadcastTables.contains("default.small1")); + assertTrue(broadcastTables.contains("default.small2")); + assertTrue(!broadcastTables.contains("default.large1")); + + LogicalNode leafNode = leafEB.getPlan(); + assertEquals(NodeType.GROUP_BY, leafNode.getType()); + + LogicalNode joinNode = ((GroupbyNode)leafNode).getChild(); + assertEquals(NodeType.JOIN, joinNode.getType()); + + LogicalNode leftNode = ((JoinNode)joinNode).getLeftChild(); + LogicalNode rightNode = ((JoinNode)joinNode).getRightChild(); + + assertEquals(NodeType.JOIN, leftNode.getType()); + assertEquals(NodeType.SCAN, rightNode.getType()); + + LogicalNode lastLeftNode = ((JoinNode)leftNode).getLeftChild(); + LogicalNode lastRightNode = ((JoinNode)leftNode).getRightChild(); + + assertEquals(NodeType.SCAN, lastLeftNode.getType()); + assertEquals(NodeType.SCAN, lastRightNode.getType()); + } + + @Test + public final void testNotBroadcastJoinTwoLargeTable() throws IOException, PlanningException { + // This query is not broadcast join + String query = "select count(*) from large1 " + + "join large2 on large1_id = large2_id "; + + LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(conf); + Expr expr = analyzer.parse(query); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); + + ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); + while (ebCursor.hasNext()) { + ExecutionBlock eb = ebCursor.nextBlock(); + Collection broadcastTables = eb.getBroadcastTables(); + assertTrue(broadcastTables == null || broadcastTables.isEmpty()); + } + } + + @Test + public final void testTwoBroadcastJoin() throws IOException, PlanningException { + String query = "select count(*) from large1 " + + "join small1 on large1_id = small1_id " + + "join large2 on large1_id = large2_id " + + "join small2 on large2_id = small2_id"; + + LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(conf); + Expr expr = analyzer.parse(query); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); + + /* + |-eb_1395736346625_0000_000009 + |-eb_1395736346625_0000_000008 (GROUP-BY) + |-eb_1395736346625_0000_000007 (GROUP-BY, JOIN) + |-eb_1395736346625_0000_000006 (LEAF, JOIN) + |-eb_1395736346625_0000_000003 (LEAF, JOIN) + */ + + ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); + int index = 0; + while (ebCursor.hasNext()) { + ExecutionBlock eb = ebCursor.nextBlock(); + if(index == 0) { + Collection broadcastTables = eb.getBroadcastTables(); + assertEquals(1, broadcastTables.size()); + + assertTrue(!broadcastTables.contains("default.large1")); + assertTrue(broadcastTables.contains("default.small1")); + } else if(index == 1) { + Collection broadcastTables = eb.getBroadcastTables(); + assertEquals(1, broadcastTables.size()); + assertTrue(!broadcastTables.contains("default.large2")); + assertTrue(broadcastTables.contains("default.small2")); + } + index++; + } + + assertEquals(5, index); + } + + @Test + public final void testNotBroadcastJoinSubquery() throws IOException, PlanningException { + // This query is not broadcast join; + String query = "select count(*) from large1 " + + "join (select * from small1) a on large1_id = a.small1_id " + + "join small2 on a.small1_id = small2_id"; + + LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(conf); + Expr expr = analyzer.parse(query); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); + + /* + |-eb_1395749810370_0000_000007 + |-eb_1395749810370_0000_000006 (GROUP-BY) + |-eb_1395749810370_0000_000005 (GROUP-BY, JOIN) + |-eb_1395749810370_0000_000004 (LEAF, SCAN, large1) + |-eb_1395749810370_0000_000003 (JOIN) + |-eb_1395749810370_0000_000002 (LEAF, SCAN, small2) + |-eb_1395749810370_0000_000001 (LEAF, TABLE_SUBQUERY, small1) + */ + + ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); + int index = 0; + while (ebCursor.hasNext()) { + ExecutionBlock eb = ebCursor.nextBlock(); + Collection broadcastTables = eb.getBroadcastTables(); + assertTrue(broadcastTables == null || broadcastTables.isEmpty()); + index++; + } + + assertEquals(7, index); + } + + @Test + public final void testBroadcastJoinSubquery() throws IOException, PlanningException { + String query = "select count(*) from large1 " + + "join (select * from small1) a on large1_id = a.small1_id " + + "join small2 on large1_id = small2_id"; + + LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(conf); + Expr expr = analyzer.parse(query); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); + + /* + |-eb_1395794091662_0000_000007 + |-eb_1395794091662_0000_000006 + |-eb_1395794091662_0000_000005 (JOIN) + |-eb_1395794091662_0000_000004 (LEAF, SUBQUERY) + |-eb_1395794091662_0000_000003 (LEAF, JOIN) + */ + + ExecutionBlockCursor ebCursor = new ExecutionBlockCursor(masterPlan); + int index = 0; + while (ebCursor.hasNext()) { + ExecutionBlock eb = ebCursor.nextBlock(); + if(index == 0) { + //LEAF, JOIN + Collection broadcastTables = eb.getBroadcastTables(); + assertEquals(1, broadcastTables.size()); + + assertTrue(!broadcastTables.contains("default.large1")); + assertTrue(broadcastTables.contains("default.small2")); + } else if(index == 1) { + //LEAF, SUBQUERY + Collection broadcastTables = eb.getBroadcastTables(); + assertTrue(broadcastTables == null || broadcastTables.isEmpty()); + } else if(index == 2) { + //JOIN + Collection broadcastTables = eb.getBroadcastTables(); + assertTrue(broadcastTables == null || broadcastTables.isEmpty()); + } + index++; + } + + assertEquals(5, index); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java new file mode 100644 index 0000000..ab56bea --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestMasterPlan.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.global; + +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestMasterPlan { + + @Test + public void testConnect() { + MasterPlan masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null); + + ExecutionBlock eb1 = masterPlan.newExecutionBlock(); + ExecutionBlock eb2 = masterPlan.newExecutionBlock(); + ExecutionBlock eb3 = masterPlan.newExecutionBlock(); + + masterPlan.addConnect(eb1, eb2, TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE); + assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId())); + assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId())); + + masterPlan.addConnect(eb3, eb2, TajoWorkerProtocol.ShuffleType.RANGE_SHUFFLE); + assertTrue(masterPlan.isConnected(eb1.getId(), eb2.getId())); + assertTrue(masterPlan.isConnected(eb3.getId(), eb2.getId())); + + assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb1.getId())); + assertTrue(masterPlan.isReverseConnected(eb2.getId(), eb3.getId())); + + masterPlan.disconnect(eb3, eb2); + assertFalse(masterPlan.isConnected(eb3, eb2)); + assertFalse(masterPlan.isReverseConnected(eb2, eb3)); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java new file mode 100644 index 0000000..c79796b --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.*; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.logical.JoinNode; +import org.apache.tajo.engine.planner.logical.LogicalNode; +import org.apache.tajo.engine.planner.logical.NodeType; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestBNLJoinExec { + private TajoConf conf; + private final String TEST_PATH = "target/test-data/TestBNLJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private AbstractStorageManager sm; + private Path testDir; + + private static int OUTER_TUPLE_NUM = 1000; + private static int INNER_TUPLE_NUM = 1000; + + private TableDesc employee; + private TableDesc people; + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + sm = StorageManagerFactory.getStorageManager(conf, testDir); + + Schema schema = new Schema(); + schema.addColumn("managerid", Type.INT4); + schema.addColumn("empid", Type.INT4); + schema.addColumn("memid", Type.INT4); + schema.addColumn("deptname", Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath); + appender.init(); + Tuple tuple = new VTuple(schema.size()); + for (int i = 0; i < OUTER_TUPLE_NUM; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createInt4(i), DatumFactory.createInt4(10 + i), + DatumFactory.createText("dept_" + i) }); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + employee = CatalogUtil.newTableDesc("default.employee", schema, employeeMeta, employeePath); + catalog.createTable(employee); + + Schema peopleSchema = new Schema(); + peopleSchema.addColumn("empid", Type.INT4); + peopleSchema.addColumn("fk_memid", Type.INT4); + peopleSchema.addColumn("name", Type.TEXT); + peopleSchema.addColumn("age", Type.INT4); + TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV); + Path peoplePath = new Path(testDir, "people.csv"); + appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath); + appender.init(); + tuple = new VTuple(peopleSchema.size()); + for (int i = 1; i < INNER_TUPLE_NUM; i += 2) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createInt4(10 + i), + DatumFactory.createText("name_" + i), + DatumFactory.createInt4(30 + i) }); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + people = CatalogUtil.newTableDesc("default.people", peopleSchema, peopleMeta, peoplePath); + catalog.createTable(people); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + // employee (managerId, empId, memId, deptName) + // people (empId, fk_memId, name, age) + String[] QUERIES = { + "select managerId, e.empId, deptName, e.memId from employee as e, people p", + "select managerId, e.empId, deptName, e.memId from employee as e " + + "inner join people as p on e.empId = p.empId and e.memId = p.fk_memId" }; + + @Test + public final void testBNLCrossJoin() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN); + + FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(), + Integer.MAX_VALUE); + FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLCrossJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof BNLJoinExec); + + int i = 0; + exec.init(); + while (exec.next() != null) { + i++; + } + exec.close(); + assertEquals(OUTER_TUPLE_NUM * INNER_TUPLE_NUM / 2, i); // expected 10 * 5 + } + + @Test + public final void testBNLInnerJoin() throws IOException, PlanningException { + Expr context = analyzer.parse(QUERIES[1]); + LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), + context).getRootBlock().getRoot(); + + FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(), employee.getPath(), + Integer.MAX_VALUE); + FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(), people.getPath(), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); + + + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.BLOCK_NESTED_LOOP_JOIN); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testBNLInnerJoin"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(), + merged, workDir); + ctx.setEnforcer(enforcer); + + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof BNLJoinExec); + + Tuple tuple; + int i = 1; + int count = 0; + exec.init(); + while ((tuple = exec.next()) != null) { + count++; + assertTrue(i == tuple.get(0).asInt4()); + assertTrue(i == tuple.get(1).asInt4()); + assertTrue(("dept_" + i).equals(tuple.get(2).asChars())); + assertTrue(10 + i == tuple.get(3).asInt4()); + i += 2; + } + exec.close(); + assertEquals(INNER_TUPLE_NUM / 2, count); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java new file mode 100644 index 0000000..a47bde3 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java @@ -0,0 +1,210 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.LogicalOptimizer; +import org.apache.tajo.engine.planner.LogicalPlan; +import org.apache.tajo.engine.planner.LogicalPlanner; +import org.apache.tajo.engine.planner.PhysicalPlannerImpl; +import org.apache.tajo.engine.planner.logical.LogicalNode; +import org.apache.tajo.engine.planner.logical.ScanNode; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.storage.index.bst.BSTIndex; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Random; +import java.util.Stack; + +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; + +public class TestBSTIndexExec { + + private TajoConf conf; + private Path idxPath; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private LogicalOptimizer optimizer; + private AbstractStorageManager sm; + private Schema idxSchema; + private TupleComparator comp; + private BSTIndex.BSTIndexWriter writer; + private HashMap randomValues ; + private int rndKey = -1; + private FileSystem fs; + private TableMeta meta; + private Path tablePath; + + private Random rnd = new Random(System.currentTimeMillis()); + + private TajoTestingCluster util; + + @Before + public void setup() throws Exception { + this.randomValues = new HashMap(); + this.conf = new TajoConf(); + util = new TajoTestingCluster(); + util.startCatalogCluster(); + catalog = util.getMiniCatalogCluster().getCatalog(); + + Path workDir = CommonTestingUtil.getTestDir(); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString()); + catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + sm = StorageManagerFactory.getStorageManager(conf, workDir); + + idxPath = new Path(workDir, "test.idx"); + + Schema schema = new Schema(); + schema.addColumn("managerid", Type.INT4); + schema.addColumn("empid", Type.INT4); + schema.addColumn("deptname", Type.TEXT); + + this.idxSchema = new Schema(); + idxSchema.addColumn("managerid", Type.INT4); + SortSpec[] sortKeys = new SortSpec[1]; + sortKeys[0] = new SortSpec(idxSchema.getColumn("managerid"), true, false); + this.comp = new TupleComparator(idxSchema, sortKeys); + + this.writer = new BSTIndex(conf).getIndexWriter(idxPath, + BSTIndex.TWO_LEVEL_INDEX, this.idxSchema, this.comp); + writer.setLoadNum(100); + writer.open(); + long offset; + + meta = CatalogUtil.newTableMeta(StoreType.CSV); + tablePath = StorageUtil.concatPath(workDir, "employee", "table.csv"); + fs = tablePath.getFileSystem(conf); + fs.mkdirs(tablePath.getParent()); + + FileAppender appender = (FileAppender)StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, + tablePath); + appender.init(); + Tuple tuple = new VTuple(schema.size()); + for (int i = 0; i < 10000; i++) { + + Tuple key = new VTuple(this.idxSchema.size()); + int rndKey = rnd.nextInt(250); + if(this.randomValues.containsKey(rndKey)) { + int t = this.randomValues.remove(rndKey) + 1; + this.randomValues.put(rndKey, t); + } else { + this.randomValues.put(rndKey, 1); + } + + key.put(new Datum[] { DatumFactory.createInt4(rndKey) }); + tuple.put(new Datum[] { DatumFactory.createInt4(rndKey), + DatumFactory.createInt4(rnd.nextInt(10)), + DatumFactory.createText("dept_" + rnd.nextInt(10)) }); + offset = appender.getOffset(); + appender.addTuple(tuple); + writer.write(key, offset); + } + appender.flush(); + appender.close(); + writer.close(); + + TableDesc desc = new TableDesc( + CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "employee"), schema, meta, + sm.getTablePath("employee")); + catalog.createTable(desc); + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog); + optimizer = new LogicalOptimizer(conf); + } + + @After + public void tearDown() { + util.shutdownCatalogCluster(); + } + + @Test + public void testEqual() throws Exception { + if(conf.getBoolean("tajo.storage.manager.v2", false)) { + return; + } + this.rndKey = rnd.nextInt(250); + final String QUERY = "select * from employee where managerId = " + rndKey; + + FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", meta, tablePath, Integer.MAX_VALUE); + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEqual"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir); + Expr expr = analyzer.parse(QUERY); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr); + LogicalNode rootNode = optimizer.optimize(plan); + + TmpPlanner phyPlanner = new TmpPlanner(conf, sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + int tupleCount = this.randomValues.get(rndKey); + int counter = 0; + exec.init(); + while (exec.next() != null) { + counter ++; + } + exec.close(); + assertEquals(tupleCount , counter); + } + + private class TmpPlanner extends PhysicalPlannerImpl { + public TmpPlanner(TajoConf conf, AbstractStorageManager sm) { + super(conf, sm); + } + + @Override + public PhysicalExec createScanPlan(TaskAttemptContext ctx, ScanNode scanNode, Stack stack) + throws IOException { + Preconditions.checkNotNull(ctx.getTable(scanNode.getTableName()), + "Error: There is no table matched to %s", scanNode.getTableName()); + + List fragments = FragmentConvertor.convert(ctx.getConf(), meta.getStoreType(), + ctx.getTables(scanNode.getTableName())); + + Datum[] datum = new Datum[]{DatumFactory.createInt4(rndKey)}; + + return new BSTIndexScanExec(ctx, sm, scanNode, fragments.get(0), idxPath, idxSchema, comp , datum); + + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java new file mode 100644 index 0000000..ff3befe --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.*; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.logical.LogicalNode; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Random; + +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestExternalSortExec { + private TajoConf conf; + private TajoTestingCluster util; + private final String TEST_PATH = "target/test-data/TestExternalSortExec"; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private AbstractStorageManager sm; + private Path testDir; + + private final int numTuple = 100000; + private Random rnd = new Random(System.currentTimeMillis()); + + private TableDesc employee; + + @Before + public void setUp() throws Exception { + this.conf = new TajoConf(); + util = new TajoTestingCluster(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString()); + sm = StorageManagerFactory.getStorageManager(conf, testDir); + + Schema schema = new Schema(); + schema.addColumn("managerid", Type.INT4); + schema.addColumn("empid", Type.INT4); + schema.addColumn("deptname", Type.TEXT); + + TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); + Path employeePath = new Path(testDir, "employee.csv"); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath); + appender.enableStats(); + appender.init(); + Tuple tuple = new VTuple(schema.size()); + for (int i = 0; i < numTuple; i++) { + tuple.put(new Datum[] { + DatumFactory.createInt4(rnd.nextInt(50)), + DatumFactory.createInt4(rnd.nextInt(100)), + DatumFactory.createText("dept_" + i), + }); + appender.addTuple(tuple); + } + appender.flush(); + appender.close(); + + System.out.println(appender.getStats().getNumRows() + " rows (" + (appender.getStats().getNumBytes() / 1048576) + + " MB)"); + + employee = new TableDesc("default.employee", schema, employeeMeta, employeePath); + catalog.createTable(employee); + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog); + } + + @After + public void tearDown() throws Exception { + CommonTestingUtil.cleanupTestDir(TEST_PATH); + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + "select managerId, empId from employee order by managerId, empId" + }; + + @Test + public final void testNext() throws IOException, PlanningException { + FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(), employee.getPath(), + Integer.MAX_VALUE); + Path workDir = new Path(testDir, TestExternalSortExec.class.getName()); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir); + ctx.setEnforcer(new Enforcer()); + Expr expr = analyzer.parse(QUERIES[0]); + LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummySession(), expr); + LogicalNode rootNode = plan.getRootBlock().getRoot(); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); + + ProjectionExec proj = (ProjectionExec) exec; + + // TODO - should be planed with user's optimization hint + if (!(proj.getChild() instanceof ExternalSortExec)) { + UnaryPhysicalExec sortExec = proj.getChild(); + SeqScanExec scan = sortExec.getChild(); + + ExternalSortExec extSort = new ExternalSortExec(ctx, sm, + ((MemSortExec)sortExec).getPlan(), scan); + proj.setChild(extSort); + } + + Tuple tuple; + Tuple preVal = null; + Tuple curVal; + int cnt = 0; + exec.init(); + long start = System.currentTimeMillis(); + TupleComparator comparator = new TupleComparator(proj.getSchema(), + new SortSpec[]{ + new SortSpec(new Column("managerid", Type.INT4)), + new SortSpec(new Column("empid", Type.INT4)) + }); + + while ((tuple = exec.next()) != null) { + curVal = tuple; + if (preVal != null) { + assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); + } + preVal = curVal; + cnt++; + } + long end = System.currentTimeMillis(); + assertEquals(numTuple, cnt); + + // for rescan test + preVal = null; + exec.rescan(); + cnt = 0; + while ((tuple = exec.next()) != null) { + curVal = tuple; + if (preVal != null) { + assertTrue("prev: " + preVal + ", but cur: " + curVal, comparator.compare(preVal, curVal) <= 0); + } + preVal = curVal; + cnt++; + } + assertEquals(numTuple, cnt); + exec.close(); + System.out.println("Sort Time: " + (end - start) + " msc"); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java new file mode 100644 index 0000000..b05688d --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java @@ -0,0 +1,403 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.*; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.logical.JoinNode; +import org.apache.tajo.engine.planner.logical.LogicalNode; +import org.apache.tajo.engine.planner.logical.NodeType; +import org.apache.tajo.master.session.Session; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; +import static org.junit.Assert.*; + +public class TestFullOuterHashJoinExec { + private TajoConf conf; + private final String TEST_PATH = "target/test-data/TestFullOuterHashJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private AbstractStorageManager sm; + private Path testDir; + private static Session session = LocalTajoTestingUtility.createDummySession(); + + private TableDesc dep3; + private TableDesc job3; + private TableDesc emp3; + private TableDesc phone3; + + private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3"); + private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3"); + private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3"); + private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3"); + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + conf = util.getConfiguration(); + sm = StorageManagerFactory.getStorageManager(conf, testDir); + + //----------------- dep3 ------------------------------ + // dep_id | dep_name | loc_id + //-------------------------------- + // 0 | dep_0 | 1000 + // 1 | dep_1 | 1001 + // 2 | dep_2 | 1002 + // 3 | dep_3 | 1003 + // 4 | dep_4 | 1004 + // 5 | dep_5 | 1005 + // 6 | dep_6 | 1006 + // 7 | dep_7 | 1007 + // 8 | dep_8 | 1008 + // 9 | dep_9 | 1009 + Schema dep3Schema = new Schema(); + dep3Schema.addColumn("dep_id", Type.INT4); + dep3Schema.addColumn("dep_name", Type.TEXT); + dep3Schema.addColumn("loc_id", Type.INT4); + + + TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV); + Path dep3Path = new Path(testDir, "dep3.csv"); + Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path); + appender1.init(); + Tuple tuple = new VTuple(dep3Schema.size()); + for (int i = 0; i < 10; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createText("dept_" + i), + DatumFactory.createInt4(1000 + i) }); + appender1.addTuple(tuple); + } + + appender1.flush(); + appender1.close(); + dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path); + catalog.createTable(dep3); + + //----------------- job3 ------------------------------ + // job_id | job_title + // ---------------------- + // 101 | job_101 + // 102 | job_102 + // 103 | job_103 + + Schema job3Schema = new Schema(); + job3Schema.addColumn("job_id", Type.INT4); + job3Schema.addColumn("job_title", Type.TEXT); + + + TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV); + Path job3Path = new Path(testDir, "job3.csv"); + Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path); + appender2.init(); + Tuple tuple2 = new VTuple(job3Schema.size()); + for (int i = 1; i < 4; i++) { + int x = 100 + i; + tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i), + DatumFactory.createText("job_" + x) }); + appender2.addTuple(tuple2); + } + + appender2.flush(); + appender2.close(); + job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path); + catalog.createTable(job3); + + + + //---------------------emp3 -------------------- + // emp_id | first_name | last_name | dep_id | salary | job_id + // ------------------------------------------------------------ + // 11 | fn_11 | ln_11 | 1 | 123 | 101 + // 13 | fn_13 | ln_13 | 3 | 369 | 103 + // 15 | fn_15 | ln_15 | 5 | 615 | null + // 17 | fn_17 | ln_17 | 7 | 861 | null + // 19 | fn_19 | ln_19 | 9 | 1107 | null + // 21 | fn_21 | ln_21 | 1 | 123 | 101 + // 23 | fn_23 | ln_23 | 3 | 369 | 103 + + Schema emp3Schema = new Schema(); + emp3Schema.addColumn("emp_id", Type.INT4); + emp3Schema.addColumn("first_name", Type.TEXT); + emp3Schema.addColumn("last_name", Type.TEXT); + emp3Schema.addColumn("dep_id", Type.INT4); + emp3Schema.addColumn("salary", Type.FLOAT4); + emp3Schema.addColumn("job_id", Type.INT4); + + + TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV); + Path emp3Path = new Path(testDir, "emp3.csv"); + Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path); + appender3.init(); + Tuple tuple3 = new VTuple(emp3Schema.size()); + + for (int i = 1; i < 4; i += 2) { + int x = 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + + int y = 20 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i), + DatumFactory.createText("firstname_" + y), + DatumFactory.createText("lastname_" + y), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + } + + for (int i = 5; i < 10; i += 2) { + int x= 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createNullDatum() }); + appender3.addTuple(tuple3); + } + + appender3.flush(); + appender3.close(); + emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path); + catalog.createTable(emp3); + + //---------------------phone3 -------------------- + // emp_id | phone_number + // ----------------------------------------------- + // this table is empty, no rows + + Schema phone3Schema = new Schema(); + phone3Schema.addColumn("emp_id", Type.INT4); + phone3Schema.addColumn("phone_number", Type.TEXT); + + + TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV); + Path phone3Path = new Path(testDir, "phone3.csv"); + Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema, + phone3Path); + appender5.init(); + + appender5.flush(); + appender5.close(); + phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path); + catalog.createTable(phone3); + + + + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + // [0] no nulls + "select dep3.dep_id, dep_name, emp_id, salary from dep3 full outer join emp3 on dep3.dep_id = emp3.dep_id", + // [1] nulls on the right operand + "select job3.job_id, job_title, emp_id, salary from job3 full outer join emp3 on job3.job_id=emp3.job_id", + // [2] nulls on the left side + "select job3.job_id, job_title, emp_id, salary from emp3 full outer join job3 on job3.job_id=emp3.job_id", + // [3] one operand is empty + "select emp3.emp_id, first_name, phone_number from emp3 full outer join phone3 on emp3.emp_id = phone3.emp_id" + }; + + @Test + public final void testFullOuterHashJoinExec0() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE); + FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec0"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(12, count); + + } + + + @Test + public final void testFullOuterHashJoinExec1() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[1]); + LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); + FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuter_HashJoinExec1"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(8, count); + + } + + @Test + public final void testFullOuterHashJoinExec2() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[2]); + LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec2"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(8, count); + + } + + + @Test + public final void testFullOuterHashJoinExec3() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[3]); + LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.IN_MEMORY_HASH_JOIN); + + FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestFullOuterHashJoinExec3"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, + workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof HashFullOuterJoinExec); + + int count = 0; + exec.init(); + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(7, count); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/6594ac1c/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java new file mode 100644 index 0000000..0386179 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java @@ -0,0 +1,536 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.*; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.engine.planner.logical.JoinNode; +import org.apache.tajo.engine.planner.logical.LogicalNode; +import org.apache.tajo.engine.planner.logical.NodeType; +import org.apache.tajo.master.session.Session; +import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; +import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME; +import static org.apache.tajo.ipc.TajoWorkerProtocol.JoinEnforce.JoinAlgorithm; +import static org.junit.Assert.*; + +public class TestFullOuterMergeJoinExec { + private TajoConf conf; + private final String TEST_PATH = "target/test-data/TestFullOuterMergeJoinExec"; + private TajoTestingCluster util; + private CatalogService catalog; + private SQLAnalyzer analyzer; + private LogicalPlanner planner; + private AbstractStorageManager sm; + private Path testDir; + private static final Session session = LocalTajoTestingUtility.createDummySession(); + + private TableDesc dep3; + private TableDesc dep4; + private TableDesc job3; + private TableDesc emp3; + private TableDesc phone3; + + private final String DEP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep3"); + private final String DEP4_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "dep4"); + private final String JOB3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "job3"); + private final String EMP3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "emp3"); + private final String PHONE3_NAME = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, "phone3"); + + @Before + public void setUp() throws Exception { + util = new TajoTestingCluster(); + util.initTestDir(); + catalog = util.startCatalogCluster().getCatalog(); + testDir = CommonTestingUtil.getTestDir(TEST_PATH); + catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString()); + catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME); + + conf = util.getConfiguration(); + sm = StorageManagerFactory.getStorageManager(conf, testDir); + + //----------------- dep3 ------------------------------ + // dep_id | dep_name | loc_id + //-------------------------------- + // 0 | dep_0 | 1000 + // 1 | dep_1 | 1001 + // 2 | dep_2 | 1002 + // 3 | dep_3 | 1003 + // 4 | dep_4 | 1004 + // 5 | dep_5 | 1005 + // 6 | dep_6 | 1006 + // 7 | dep_7 | 1007 + // 8 | dep_8 | 1008 + // 9 | dep_9 | 1009 + Schema dep3Schema = new Schema(); + dep3Schema.addColumn("dep_id", Type.INT4); + dep3Schema.addColumn("dep_name", Type.TEXT); + dep3Schema.addColumn("loc_id", Type.INT4); + + + TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV); + Path dep3Path = new Path(testDir, "dep3.csv"); + Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path); + appender1.init(); + Tuple tuple = new VTuple(dep3Schema.size()); + for (int i = 0; i < 10; i++) { + tuple.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createText("dept_" + i), + DatumFactory.createInt4(1000 + i) }); + appender1.addTuple(tuple); + } + + appender1.flush(); + appender1.close(); + dep3 = CatalogUtil.newTableDesc(DEP3_NAME, dep3Schema, dep3Meta, dep3Path); + catalog.createTable(dep3); + + + //----------------- dep4 ------------------------------ + // dep_id | dep_name | loc_id + //-------------------------------- + // 0 | dep_0 | 1000 + // 1 | dep_1 | 1001 + // 2 | dep_2 | 1002 + // 3 | dep_3 | 1003 + // 4 | dep_4 | 1004 + // 5 | dep_5 | 1005 + // 6 | dep_6 | 1006 + // 7 | dep_7 | 1007 + // 8 | dep_8 | 1008 + // 9 | dep_9 | 1009 + // 10 | dep_10 | 1010 + Schema dep4Schema = new Schema(); + dep4Schema.addColumn("dep_id", Type.INT4); + dep4Schema.addColumn("dep_name", Type.TEXT); + dep4Schema.addColumn("loc_id", Type.INT4); + + + TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV); + Path dep4Path = new Path(testDir, "dep4.csv"); + Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path); + appender4.init(); + Tuple tuple4 = new VTuple(dep4Schema.size()); + for (int i = 0; i < 11; i++) { + tuple4.put(new Datum[] { DatumFactory.createInt4(i), + DatumFactory.createText("dept_" + i), + DatumFactory.createInt4(1000 + i) }); + appender4.addTuple(tuple4); + } + + appender4.flush(); + appender4.close(); + dep4 = CatalogUtil.newTableDesc(DEP4_NAME, dep4Schema, dep4Meta, dep4Path); + catalog.createTable(dep4); + + + + //----------------- job3 ------------------------------ + // job_id | job_title + // ---------------------- + // 101 | job_101 + // 102 | job_102 + // 103 | job_103 + + Schema job3Schema = new Schema(); + job3Schema.addColumn("job_id", Type.INT4); + job3Schema.addColumn("job_title", Type.TEXT); + + + TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV); + Path job3Path = new Path(testDir, "job3.csv"); + Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path); + appender2.init(); + Tuple tuple2 = new VTuple(job3Schema.size()); + for (int i = 1; i < 4; i++) { + int x = 100 + i; + tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i), + DatumFactory.createText("job_" + x) }); + appender2.addTuple(tuple2); + } + + appender2.flush(); + appender2.close(); + job3 = CatalogUtil.newTableDesc(JOB3_NAME, job3Schema, job3Meta, job3Path); + catalog.createTable(job3); + + + + //---------------------emp3 -------------------- + // emp_id | first_name | last_name | dep_id | salary | job_id + // ------------------------------------------------------------ + // 11 | fn_11 | ln_11 | 1 | 123 | 101 + // 13 | fn_13 | ln_13 | 3 | 369 | 103 + // 15 | fn_15 | ln_15 | 5 | 615 | null + // 17 | fn_17 | ln_17 | 7 | 861 | null + // 19 | fn_19 | ln_19 | 9 | 1107 | null + // 21 | fn_21 | ln_21 | 1 | 123 | 101 + // 23 | fn_23 | ln_23 | 3 | 369 | 103 + + Schema emp3Schema = new Schema(); + emp3Schema.addColumn("emp_id", Type.INT4); + emp3Schema.addColumn("first_name", Type.TEXT); + emp3Schema.addColumn("last_name", Type.TEXT); + emp3Schema.addColumn("dep_id", Type.INT4); + emp3Schema.addColumn("salary", Type.FLOAT4); + emp3Schema.addColumn("job_id", Type.INT4); + + + TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV); + Path emp3Path = new Path(testDir, "emp3.csv"); + Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path); + appender3.init(); + Tuple tuple3 = new VTuple(emp3Schema.size()); + + for (int i = 1; i < 4; i += 2) { + int x = 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + + int y = 20 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(20 + i), + DatumFactory.createText("firstname_" + y), + DatumFactory.createText("lastname_" + y), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createInt4(100 + i) }); + appender3.addTuple(tuple3); + } + + for (int i = 5; i < 10; i += 2) { + int x = 10 + i; + tuple3.put(new Datum[] { DatumFactory.createInt4(10 + i), + DatumFactory.createText("firstname_" + x), + DatumFactory.createText("lastname_" + x), + DatumFactory.createInt4(i), + DatumFactory.createFloat4(123 * i), + DatumFactory.createNullDatum() }); + appender3.addTuple(tuple3); + } + + appender3.flush(); + appender3.close(); + emp3 = CatalogUtil.newTableDesc(EMP3_NAME, emp3Schema, emp3Meta, emp3Path); + catalog.createTable(emp3); + + //---------------------phone3 -------------------- + // emp_id | phone_number + // ----------------------------------------------- + // this table is empty, no rows + + Schema phone3Schema = new Schema(); + phone3Schema.addColumn("emp_id", Type.INT4); + phone3Schema.addColumn("phone_number", Type.TEXT); + + + TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV); + Path phone3Path = new Path(testDir, "phone3.csv"); + Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema, + phone3Path); + appender5.init(); + appender5.flush(); + appender5.close(); + phone3 = CatalogUtil.newTableDesc(PHONE3_NAME, phone3Schema, phone3Meta, phone3Path); + catalog.createTable(phone3); + + analyzer = new SQLAnalyzer(); + planner = new LogicalPlanner(catalog); + } + + @After + public void tearDown() throws Exception { + util.shutdownCatalogCluster(); + } + + String[] QUERIES = { + // [0] no nulls + "select dep3.dep_id, dep_name, emp_id, salary from emp3 full outer join dep3 on dep3.dep_id = emp3.dep_id", + // [1] nulls on the left operand + "select job3.job_id, job_title, emp_id, salary from emp3 full outer join job3 on job3.job_id=emp3.job_id", + // [2] nulls on the right side + "select job3.job_id, job_title, emp_id, salary from job3 full outer join emp3 on job3.job_id=emp3.job_id", + // [3] no nulls, right continues after left + "select dep4.dep_id, dep_name, emp_id, salary from emp3 full outer join dep4 on dep4.dep_id = emp3.dep_id", + // [4] one operand is empty + "select emp3.emp_id, first_name, phone_number from emp3 full outer join phone3 on emp3.emp_id = phone3.emp_id", + // [5] one operand is empty + "select emp3.emp_id, first_name, phone_number from phone3 full outer join emp3 on emp3.emp_id = phone3.emp_id", + }; + + @Test + public final void testFullOuterMergeJoin0() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[0]); + LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] dep3Frags = + StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin0"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(12, count); + } + + + @Test + public final void testFullOuterMergeJoin1() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[1]); + LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] job3Frags = + StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin1"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(8, count); + } + + @Test + public final void testFullOuterMergeJoin2() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[2]); + LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] job3Frags = + StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin2"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(8, count); + } + + @Test + public final void testFullOuterMergeJoin3() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[3]); + LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] dep4Frags = + StorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin3"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + + // if it chose the hash join WITH REVERSED ORDER, convert to merge join exec + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(13, count); + } + + + @Test + public final void testFullOuterMergeJoin4() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[4]); + LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] phone3Frags = + StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin4"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(7, count); + } + + + @Test + public final void testFullOuterMergeJoin5() throws IOException, PlanningException { + Expr expr = analyzer.parse(QUERIES[5]); + LogicalNode plan = planner.createPlan(session, expr).getRootBlock().getRoot(); + JoinNode joinNode = PlannerUtil.findTopNode(plan, NodeType.JOIN); + Enforcer enforcer = new Enforcer(); + enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); + + FileFragment[] emp3Frags = + StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] phone3Frags = + StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), phone3.getPath(), + Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags); + + Path workDir = CommonTestingUtil.getTestDir("target/test-data/testFullOuterMergeJoin5"); + TaskAttemptContext ctx = new TaskAttemptContext(conf, + LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir); + ctx.setEnforcer(enforcer); + + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); + PhysicalExec exec = phyPlanner.createPlan(ctx, plan); + + ProjectionExec proj = (ProjectionExec) exec; + assertTrue(proj.getChild() instanceof MergeFullOuterJoinExec); + + int count = 0; + exec.init(); + + while (exec.next() != null) { + //TODO check contents + count = count + 1; + } + assertNull(exec.next()); + exec.close(); + assertEquals(7, count); + } + + + + + +}