Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E7FA510D06 for ; Tue, 29 Oct 2013 10:26:31 +0000 (UTC) Received: (qmail 28040 invoked by uid 500); 29 Oct 2013 10:26:31 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 28001 invoked by uid 500); 29 Oct 2013 10:26:29 -0000 Mailing-List: contact commits-help@tajo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.incubator.apache.org Delivered-To: mailing list commits@tajo.incubator.apache.org Received: (qmail 27992 invoked by uid 99); 29 Oct 2013 10:26:28 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Oct 2013 10:26:28 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 29 Oct 2013 10:26:14 +0000 Received: (qmail 27364 invoked by uid 99); 29 Oct 2013 10:25:50 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Oct 2013 10:25:50 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C2A0D88A9E6; Tue, 29 Oct 2013 10:25:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hyunsik@apache.org To: commits@tajo.incubator.apache.org Date: Tue, 29 Oct 2013 10:25:50 -0000 Message-Id: <479c25fc4abc4bdb9432d7bff76c7c2c@git.apache.org> In-Reply-To: <3befc71f733d4c799198496d04c1c52c@git.apache.org> References: <3befc71f733d4c799198496d04c1c52c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/3] TAJO-287: Improve Fragment to be more generic. (hyunsik) X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java index 025af87..ce31851 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java @@ -19,8 +19,9 @@ package org.apache.tajo.engine.planner.physical; import org.apache.hadoop.fs.Path; -import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; @@ -37,15 +38,16 @@ import org.apache.tajo.engine.planner.logical.LogicalNode; import org.apache.tajo.storage.*; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.TUtil; -import org.apache.tajo.worker.TaskAttemptContext; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.apache.tajo.LocalTajoTestingUtility; import java.io.IOException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestLeftOuterNLJoinExec { private TajoConf conf; @@ -239,12 +241,12 @@ public class TestLeftOuterNLJoinExec { @Test public final void testLeftOuterNLJoinExec0() throws IOException, PlanningException { - Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), + FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE); - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(dep3Frags, emp3Frags); + FileFragment[] merged = TUtil.concat(dep3Frags, emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuterNLJoinExec0"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -279,12 +281,12 @@ public class TestLeftOuterNLJoinExec { @Test public final void testLeftOuterNLJoinExec1() throws IOException, PlanningException { - Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), + FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(job3Frags, emp3Frags); + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec1"); @@ -323,12 +325,12 @@ public class TestLeftOuterNLJoinExec { @Test public final void testLeftOuter_NLJoinExec2() throws IOException, PlanningException { - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), + FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(emp3Frags, job3Frags); + FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec2"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -367,12 +369,12 @@ public class TestLeftOuterNLJoinExec { @Test public final void testLeftOuter_NLJoinExec3() throws IOException, PlanningException { - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(), + FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags); + FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec3"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -410,12 +412,12 @@ public class TestLeftOuterNLJoinExec { @Test public final void testLeftOuter_NLJoinExec4() throws IOException, PlanningException { - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(), + FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(phone3Frags, emp3Frags); + FileFragment[] merged = TUtil.concat(phone3Frags, emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestLeftOuter_NLJoinExec4"); TaskAttemptContext ctx = new TaskAttemptContext(conf, http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java index 5e72428..e6dd0a5 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; @@ -155,9 +156,9 @@ public class TestMergeJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); - Fragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); - Fragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(empFrags, peopleFrags); + FileFragment[] empFrags = sm.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); + FileFragment[] peopleFrags = sm.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testMergeInnerJoin"); TaskAttemptContext ctx = new TaskAttemptContext(conf, http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java index 3d356ee..50d431c 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java @@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.hadoop.fs.Path; import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; @@ -138,12 +139,12 @@ public class TestNLJoinExec { @Test public final void testNLCrossJoin() throws IOException, PlanningException { - Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), + FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); - Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), + FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE); - Fragment [] merged = TUtil.concat(empFrags, peopleFrags); + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLCrossJoin"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -166,12 +167,12 @@ public class TestNLJoinExec { @Test public final void testNLInnerJoin() throws IOException, PlanningException { - Fragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), + FileFragment[] empFrags = StorageManager.splitNG(conf, "e", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); - Fragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), + FileFragment[] peopleFrags = StorageManager.splitNG(conf, "p", people.getMeta(), people.getPath(), Integer.MAX_VALUE); - Fragment [] merged = TUtil.concat(empFrags, peopleFrags); + FileFragment[] merged = TUtil.concat(empFrags, peopleFrags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testNLInnerJoin"); TaskAttemptContext ctx = new TaskAttemptContext(conf, http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index 8b50480..b57ef3a 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -45,6 +45,7 @@ import org.apache.tajo.engine.planner.logical.*; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.TUtil; @@ -181,11 +182,11 @@ public class TestPhysicalPlanner { @Test public final void testCreateScanPlan() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), + FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanPlan"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { frags[0] }, workDir); + new FileFragment[] { frags[0] }, workDir); Expr expr = analyzer.parse(QUERIES[0]); LogicalPlan plan = planner.createPlan(expr); LogicalNode rootNode =plan.getRootBlock().getRoot(); @@ -210,11 +211,11 @@ public class TestPhysicalPlanner { @Test public final void testCreateScanWithFilterPlan() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), + FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanWithFilterPlan"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { frags[0] }, workDir); + new FileFragment[] { frags[0] }, workDir); Expr expr = analyzer.parse(QUERIES[16]); LogicalPlan plan = planner.createPlan(expr); LogicalNode rootNode =plan.getRootBlock().getRoot(); @@ -237,11 +238,11 @@ public class TestPhysicalPlanner { @Test public final void testGroupByPlan() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByPlan"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { frags[0] }, workDir); + new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer()); Expr context = analyzer.parse(QUERIES[7]); LogicalPlan plan = planner.createPlan(context); @@ -267,12 +268,12 @@ public class TestPhysicalPlanner { @Test public final void testHashGroupByPlanWithALLField() throws IOException, PlanningException { // TODO - currently, this query does not use hash-based group operator. - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir( "target/test-data/testHashGroupByPlanWithALLField"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { frags[0] }, workDir); + new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer()); Expr expr = analyzer.parse(QUERIES[15]); LogicalPlan plan = planner.createPlan(expr); @@ -297,11 +298,11 @@ public class TestPhysicalPlanner { @Test public final void testSortGroupByPlan() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortGroupByPlan"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[]{frags[0]}, workDir); + new FileFragment[]{frags[0]}, workDir); ctx.setEnforcer(new Enforcer()); Expr context = analyzer.parse(QUERIES[7]); LogicalPlan plan = planner.createPlan(context); @@ -355,11 +356,11 @@ public class TestPhysicalPlanner { @Test public final void testStorePlan() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { frags[0] }, workDir); + new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer()); ctx.setOutputPath(new Path(workDir, "grouped1")); @@ -376,7 +377,7 @@ public class TestPhysicalPlanner { exec.next(); exec.close(); - Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, rootNode.getOutSchema(), + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath()); scanner.init(); Tuple tuple; @@ -396,11 +397,11 @@ public class TestPhysicalPlanner { @Test public final void testStorePlanWithRCFile() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithRCFile"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { frags[0] }, workDir); + new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer()); ctx.setOutputPath(new Path(workDir, "grouped2")); @@ -416,7 +417,7 @@ public class TestPhysicalPlanner { exec.next(); exec.close(); - Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, rootNode.getOutSchema(), + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath()); scanner.init(); Tuple tuple; @@ -436,11 +437,11 @@ public class TestPhysicalPlanner { @Test public final void testPartitionedStorePlan() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan"); - TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir); + TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer()); Expr context = analyzer.parse(QUERIES[7]); LogicalPlan plan = planner.createPlan(context); @@ -468,12 +469,12 @@ public class TestPhysicalPlanner { FileStatus [] list = fs.listStatus(path); assertEquals(numPartitions, list.length); - Fragment [] fragments = new Fragment[list.length]; + FileFragment[] fragments = new FileFragment[list.length]; int i = 0; for (FileStatus status : list) { - fragments[i++] = new Fragment("partition", status.getPath(), 0, status.getLen()); + fragments[i++] = new FileFragment("partition", status.getPath(), 0, status.getLen()); } - Scanner scanner = new MergeScanner(conf, outputMeta,rootNode.getOutSchema(), TUtil.newList(fragments)); + Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments)); scanner.init(); Tuple tuple; @@ -494,13 +495,13 @@ public class TestPhysicalPlanner { @Test public final void testPartitionedStorePlanWithEmptyGroupingSet() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan); Path workDir = CommonTestingUtil.getTestDir( "target/test-data/testPartitionedStorePlanWithEmptyGroupingSet"); - TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new Fragment[] { frags[0] }, workDir); + TaskAttemptContext ctx = new TaskAttemptContext(conf, id, new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer()); Expr expr = analyzer.parse(QUERIES[14]); LogicalPlan plan = planner.createPlan(expr); @@ -526,12 +527,12 @@ public class TestPhysicalPlanner { FileStatus [] list = fs.listStatus(path); assertEquals(numPartitions, list.length); - Fragment [] fragments = new Fragment[list.length]; + FileFragment[] fragments = new FileFragment[list.length]; int i = 0; for (FileStatus status : list) { - fragments[i++] = new Fragment("partition", status.getPath(), 0, status.getLen()); + fragments[i++] = new FileFragment("partition", status.getPath(), 0, status.getLen()); } - Scanner scanner = new MergeScanner(conf, outputMeta,rootNode.getOutSchema(), TUtil.newList(fragments)); + Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments)); scanner.init(); Tuple tuple; i = 0; @@ -550,11 +551,11 @@ public class TestPhysicalPlanner { @Test public final void testAggregationFunction() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testAggregationFunction"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { frags[0] }, workDir); + new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer()); Expr context = analyzer.parse(QUERIES[8]); LogicalPlan plan = planner.createPlan(context); @@ -584,11 +585,11 @@ public class TestPhysicalPlanner { @Test public final void testCountFunction() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCountFunction"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { frags[0] }, workDir); + new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer()); Expr context = analyzer.parse(QUERIES[9]); LogicalPlan plan = planner.createPlan(context); @@ -616,11 +617,11 @@ public class TestPhysicalPlanner { @Test public final void testGroupByWithNullValue() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByWithNullValue"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { frags[0] }, workDir); + new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer()); Expr context = analyzer.parse(QUERIES[11]); LogicalPlan plan = planner.createPlan(context); @@ -640,11 +641,11 @@ public class TestPhysicalPlanner { @Test public final void testUnionPlan() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testUnionPlan"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { frags[0] }, workDir); + new FileFragment[] { frags[0] }, workDir); Expr context = analyzer.parse(QUERIES[0]); LogicalPlan plan = planner.createPlan(context); LogicalNode rootNode = optimizer.optimize(plan); @@ -668,7 +669,7 @@ public class TestPhysicalPlanner { public final void testEvalExpr() throws IOException, PlanningException { Path workDir = CommonTestingUtil.getTestDir("target/test-data/testEvalExpr"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] { }, workDir); + new FileFragment[] { }, workDir); Expr expr = analyzer.parse(QUERIES[12]); LogicalPlan plan = planner.createPlan(expr); LogicalNode rootNode = optimizer.optimize(plan); @@ -700,11 +701,11 @@ public class TestPhysicalPlanner { //@Test public final void testCreateIndex() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateIndex"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] {frags[0]}, workDir); + new FileFragment[] {frags[0]}, workDir); Expr context = analyzer.parse(createIndexStmt[0]); LogicalPlan plan = planner.createPlan(context); LogicalNode rootNode = optimizer.optimize(plan); @@ -726,12 +727,12 @@ public class TestPhysicalPlanner { @Test public final void testDuplicateEliminate() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testDuplicateEliminate"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] {frags[0]}, workDir); + new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(new Enforcer()); Expr expr = analyzer.parse(duplicateElimination[0]); LogicalPlan plan = planner.createPlan(expr); @@ -759,12 +760,12 @@ public class TestPhysicalPlanner { @Test public final void testIndexedStoreExec() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), + FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] {frags[0]}, workDir); + new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(new Enforcer()); Expr context = analyzer.parse(SORT_QUERY[0]); LogicalPlan plan = planner.createPlan(context); @@ -848,7 +849,7 @@ public class TestPhysicalPlanner { @Test public final void testSortEnforcer() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), + FileFragment[] frags = StorageManager.splitNG(conf, "employee", employee.getMeta(), employee.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortEnforcer"); @@ -862,7 +863,7 @@ public class TestPhysicalPlanner { Enforcer enforcer = new Enforcer(); enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.IN_MEMORY_SORT); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] {frags[0]}, workDir); + new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(enforcer); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); @@ -883,7 +884,7 @@ public class TestPhysicalPlanner { enforcer = new Enforcer(); enforcer.enforceSortAlgorithm(sortNode.getPID(), SortAlgorithm.MERGE_SORT); ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] {frags[0]}, workDir); + new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(enforcer); phyPlanner = new PhysicalPlannerImpl(conf,sm); @@ -897,7 +898,7 @@ public class TestPhysicalPlanner { @Test public final void testGroupByEnforcer() throws IOException, PlanningException { - Fragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); + FileFragment[] frags = StorageManager.splitNG(conf, "score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByEnforcer"); Expr context = analyzer.parse(QUERIES[7]); @@ -910,7 +911,7 @@ public class TestPhysicalPlanner { Enforcer enforcer = new Enforcer(); enforcer.enforceHashAggregation(groupByNode.getPID()); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] {frags[0]}, workDir); + new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(enforcer); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); @@ -931,7 +932,7 @@ public class TestPhysicalPlanner { enforcer = new Enforcer(); enforcer.enforceSortAggregation(groupByNode.getPID(), null); ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan), - new Fragment[] {frags[0]}, workDir); + new FileFragment[] {frags[0]}, workDir); ctx.setEnforcer(enforcer); phyPlanner = new PhysicalPlannerImpl(conf,sm); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java index c95881a..d582e2b 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java @@ -19,8 +19,9 @@ package org.apache.tajo.engine.planner.physical; import org.apache.hadoop.fs.Path; -import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; @@ -37,14 +38,15 @@ import org.apache.tajo.engine.planner.logical.LogicalNode; import org.apache.tajo.storage.*; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.TUtil; -import org.apache.tajo.worker.TaskAttemptContext; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.apache.tajo.LocalTajoTestingUtility; import java.io.IOException; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; // this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with switched inputs order public class TestRightOuterHashJoinExec { @@ -217,12 +219,12 @@ public class TestRightOuterHashJoinExec { @Test public final void testRightOuter_HashJoinExec0() throws IOException, PlanningException { - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), + FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags); + FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec0"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -257,12 +259,12 @@ public class TestRightOuterHashJoinExec { @Test public final void testRightOuter_HashJoinExec1() throws IOException, PlanningException { - Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), + FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(emp3Frags, job3Frags); + FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec1"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -297,12 +299,12 @@ public class TestRightOuterHashJoinExec { @Test public final void testRightOuter_HashJoinExec2() throws IOException, PlanningException { - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), + FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(job3Frags, emp3Frags); + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestRightOuter_HashJoinExec2"); TaskAttemptContext ctx = new TaskAttemptContext(conf, http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java index bf08479..5bbb4aa 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java @@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.hadoop.fs.Path; import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; @@ -299,9 +300,9 @@ public class TestRightOuterMergeJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(emp3Frags, dep3Frags); + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] dep3Frags = StorageManager.splitNG(conf, "dep3", dep3.getMeta(), dep3.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin0"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -334,9 +335,9 @@ public class TestRightOuterMergeJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(job3Frags, emp3Frags); + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin1"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -368,9 +369,9 @@ public class TestRightOuterMergeJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(job3Frags, emp3Frags); + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] job3Frags = StorageManager.splitNG(conf, "job3", job3.getMeta(), job3.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin2"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -402,9 +403,9 @@ public class TestRightOuterMergeJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(emp3Frags, dep4Frags); + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] dep4Frags = StorageManager.splitNG(conf, "dep4", dep4.getMeta(), dep4.getPath(), Integer.MAX_VALUE); + FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin3"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -437,10 +438,10 @@ public class TestRightOuterMergeJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(), + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(emp3Frags, phone3Frags); + FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin4"); TaskAttemptContext ctx = new TaskAttemptContext(conf, @@ -472,10 +473,10 @@ public class TestRightOuterMergeJoinExec { Enforcer enforcer = new Enforcer(); enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN); - Fragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); - Fragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(), + FileFragment[] emp3Frags = StorageManager.splitNG(conf, "emp3", emp3.getMeta(), emp3.getPath(), Integer.MAX_VALUE); + FileFragment[] phone3Frags = StorageManager.splitNG(conf, "phone3", phone3.getMeta(), phone3.getPath(), Integer.MAX_VALUE); - Fragment[] merged = TUtil.concat(phone3Frags,emp3Frags); + FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags); Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin5"); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java index 3dc3a4a..45badd5 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java @@ -21,6 +21,7 @@ package org.apache.tajo.engine.planner.physical; import org.apache.hadoop.fs.Path; import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; @@ -108,10 +109,10 @@ public class TestSortExec { @Test public final void testNext() throws IOException, PlanningException { - Fragment [] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE); + FileFragment[] frags = sm.splitNG(conf, "employee", employeeMeta, tablePath, Integer.MAX_VALUE); Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec"); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility - .newQueryUnitAttemptId(), new Fragment[] { frags[0] }, workDir); + .newQueryUnitAttemptId(), new FileFragment[] { frags[0] }, workDir); ctx.setEnforcer(new Enforcer()); Expr context = analyzer.parse(QUERIES[0]); LogicalPlan plan = planner.createPlan(context); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java new file mode 100644 index 0000000..684cb58 --- /dev/null +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFileFragment.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import com.google.common.collect.Sets; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.SortedSet; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestFileFragment { + private Path path; + + @Before + public final void setUp() throws Exception { + path = CommonTestingUtil.getTestDir(); + } + + @Test + public final void testGetAndSetFields() { + FileFragment fragment1 = new FileFragment("table1_1", new Path(path, "table0"), 0, 500); + fragment1.setDistCached(); + + assertEquals("table1_1", fragment1.getTableName()); + assertEquals(new Path(path, "table0"), fragment1.getPath()); + assertTrue(fragment1.isDistCached()); + assertTrue(0 == fragment1.getStartKey()); + assertTrue(500 == fragment1.getEndKey()); + } + + @Test + public final void testGetProtoAndRestore() { + FileFragment fragment = new FileFragment("table1_1", new Path(path, "table0"), 0, 500); + fragment.setDistCached(); + + FileFragment fragment1 = FragmentConvertor.convert(FileFragment.class, fragment.getProto()); + assertEquals("table1_1", fragment1.getTableName()); + assertEquals(new Path(path, "table0"), fragment1.getPath()); + assertTrue(fragment.isDistCached()); + assertTrue(0 == fragment1.getStartKey()); + assertTrue(500 == fragment1.getEndKey()); + } + + @Test + public final void testCompareTo() { + final int num = 10; + FileFragment[] tablets = new FileFragment[num]; + for (int i = num - 1; i >= 0; i--) { + tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500); + } + + Arrays.sort(tablets); + + for(int i = 0; i < num; i++) { + assertEquals("tablet1_"+i, tablets[i].getTableName()); + } + } + + @Test + public final void testCompareTo2() { + final int num = 1860; + FileFragment[] tablets = new FileFragment[num]; + for (int i = num - 1; i >= 0; i--) { + tablets[i] = new FileFragment("tablet1_"+i, new Path(path, "tablet0"), (long)i * 6553500, (long)(i+1) * 6553500); + } + + SortedSet sortedSet = Sets.newTreeSet(); + for (FileFragment frag : tablets) { + sortedSet.add(frag); + } + assertEquals(num, sortedSet.size()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java deleted file mode 100644 index 630acd1..0000000 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java +++ /dev/null @@ -1,100 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.collect.Sets; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.util.CommonTestingUtil; -import org.junit.Before; -import org.junit.Test; - -import java.util.Arrays; -import java.util.SortedSet; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class TestFragment { - private Schema schema1; - private Path path; - - @Before - public final void setUp() throws Exception { - schema1 = new Schema(); - schema1.addColumn("id", Type.INT4); - schema1.addColumn("name", Type.TEXT); - path = CommonTestingUtil.getTestDir(); - } - - @Test - public final void testGetAndSetFields() { - Fragment fragment1 = new Fragment("table1_1", new Path(path, "table0"), 0, 500); - fragment1.setDistCached(); - - assertEquals("table1_1", fragment1.getName()); - assertEquals(new Path(path, "table0"), fragment1.getPath()); - assertTrue(fragment1.isDistCached()); - assertTrue(0 == fragment1.getStartOffset()); - assertTrue(500 == fragment1.getLength()); - } - - @Test - public final void testTabletTabletProto() { - Fragment fragment0 = new Fragment("table1_1", new Path(path, "table0"), 0, 500); - fragment0.setDistCached(); - Fragment fragment1 = new Fragment(fragment0.getProto()); - assertEquals("table1_1", fragment1.getName()); - assertEquals(new Path(path, "table0"), fragment1.getPath()); - assertTrue(fragment1.isDistCached()); - assertTrue(0 == fragment1.getStartOffset()); - assertTrue(500 == fragment1.getLength()); - } - - @Test - public final void testCompareTo() { - final int num = 10; - Fragment [] tablets = new Fragment[num]; - for (int i = num - 1; i >= 0; i--) { - tablets[i] = new Fragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500); - } - - Arrays.sort(tablets); - - for(int i = 0; i < num; i++) { - assertEquals("tablet1_"+i, tablets[i].getName()); - } - } - - @Test - public final void testCompareTo2() { - final int num = 1860; - Fragment [] tablets = new Fragment[num]; - for (int i = num - 1; i >= 0; i--) { - tablets[i] = new Fragment("tablet1_"+i, new Path(path, "tablet0"), (long)i * 6553500, (long)(i+1) * 6553500); - } - - SortedSet sortedSet = Sets.newTreeSet(); - for (Fragment frag : tablets) { - sortedSet.add(frag); - } - assertEquals(num, sortedSet.size()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java index 2678a54..99d89c0 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java @@ -35,6 +35,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.FileUtil; import org.junit.After; import org.junit.Before; @@ -106,7 +107,7 @@ public class TestRowFile { TableProto proto = (TableProto) FileUtil.loadProto( cluster.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance()); meta = new TableMeta(proto); - Fragment fragment = new Fragment("test.tbl", dataPath, 0, file.getLen()); + FileFragment fragment = new FileFragment("test.tbl", dataPath, 0, file.getLen()); int tupleCnt = 0; start = System.currentTimeMillis(); @@ -124,8 +125,8 @@ public class TestRowFile { long fileLen = file.getLen()/13; for (int i = 0; i < 13; i++) { - fragment = new Fragment("test.tbl", dataPath, fileStart, fileLen); - scanner = new RowFile.RowFileScanner(conf, meta, schema, fragment); + fragment = new FileFragment("test.tbl", dataPath, fileStart, fileLen); + scanner = new RowFile.RowFileScanner(conf, schema, meta, fragment); scanner.init(); while ((tuple=scanner.next()) != null) { if (!idSet.remove(tuple.get(0).asInt4())) { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java index 3b568fc..6934872 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java @@ -37,6 +37,7 @@ import org.apache.tajo.engine.planner.*; import org.apache.tajo.engine.planner.logical.LogicalNode; import org.apache.tajo.engine.planner.physical.*; import org.apache.tajo.storage.*; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.worker.dataserver.retriever.FileChunk; @@ -127,11 +128,11 @@ public class TestRangeRetrieverHandler { TableDesc employee = new TableDesc("employee", schema, employeeMeta, tableDir); catalog.addTable(employee); - Fragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tableDir, Integer.MAX_VALUE); + FileFragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tableDir, Integer.MAX_VALUE); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(), - new Fragment[] {frags[0]}, testDir); + new FileFragment[] {frags[0]}, testDir); Expr expr = analyzer.parse(SORT_QUERY[0]); LogicalPlan plan = planner.createPlan(expr); LogicalNode rootNode = optimizer.optimize(plan); @@ -240,11 +241,11 @@ public class TestRangeRetrieverHandler { TableDesc employee = new TableDesc("employee", schema, meta, tablePath); catalog.addTable(employee); - Fragment[] frags = sm.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE); + FileFragment[] frags = sm.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE); TaskAttemptContext ctx = new TaskAttemptContext(conf, LocalTajoTestingUtility.newQueryUnitAttemptId(), - new Fragment[] {frags[0]}, testDir); + new FileFragment[] {frags[0]}, testDir); Expr expr = analyzer.parse(SORT_QUERY[1]); LogicalPlan plan = planner.createPlan(expr); LogicalNode rootNode = optimizer.optimize(plan); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java index 0e695a3..efdd023 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java @@ -30,17 +30,20 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.annotation.ForSplitableStore; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.Bytes; import org.apache.tajo.util.FileUtil; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Constructor; -import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; + public abstract class AbstractStorageManager { private final Log LOG = LogFactory.getLog(AbstractStorageManager.class); @@ -82,16 +85,23 @@ public abstract class AbstractStorageManager { LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); } - public Scanner getScanner(TableMeta meta, Schema schema, Path path) + public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) throws IOException { FileSystem fs = path.getFileSystem(conf); FileStatus status = fs.getFileStatus(path); - Fragment fragment = new Fragment(path.getName(), path, 0, status.getLen()); + FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); return getScanner(meta, schema, fragment); } - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) - throws IOException { + public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment) throws IOException { + return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), schema); + } + + public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { + return getScanner(meta, schema, FragmentConvertor.convert(conf, meta.getStoreType(), fragment), target); + } + + public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { return getScanner(meta, schema, fragment, schema); } @@ -175,51 +185,51 @@ public abstract class AbstractStorageManager { return meta; } - public Fragment[] split(String tableName) throws IOException { + public FileFragment[] split(String tableName) throws IOException { Path tablePath = new Path(tableBaseDir, tableName); return split(tableName, tablePath, fs.getDefaultBlockSize()); } - public Fragment[] split(String tableName, long fragmentSize) throws IOException { + public FileFragment[] split(String tableName, long fragmentSize) throws IOException { Path tablePath = new Path(tableBaseDir, tableName); return split(tableName, tablePath, fragmentSize); } - public Fragment[] splitBroadcastTable(Path tablePath) throws IOException { + public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException { FileSystem fs = tablePath.getFileSystem(conf); TableMeta meta = getTableMeta(tablePath); - List listTablets = new ArrayList(); - Fragment tablet; + List listTablets = new ArrayList(); + FileFragment tablet; FileStatus[] fileLists = fs.listStatus(tablePath); for (FileStatus file : fileLists) { - tablet = new Fragment(tablePath.getName(), file.getPath(), 0, file.getLen()); + tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen()); listTablets.add(tablet); } - Fragment[] tablets = new Fragment[listTablets.size()]; + FileFragment[] tablets = new FileFragment[listTablets.size()]; listTablets.toArray(tablets); return tablets; } - public Fragment[] split(Path tablePath) throws IOException { + public FileFragment[] split(Path tablePath) throws IOException { FileSystem fs = tablePath.getFileSystem(conf); return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize()); } - public Fragment[] split(String tableName, Path tablePath) throws IOException { + public FileFragment[] split(String tableName, Path tablePath) throws IOException { return split(tableName, tablePath, fs.getDefaultBlockSize()); } - private Fragment[] split(String tableName, Path tablePath, long size) + private FileFragment[] split(String tableName, Path tablePath, long size) throws IOException { FileSystem fs = tablePath.getFileSystem(conf); TableMeta meta = getTableMeta(tablePath); long defaultBlockSize = size; - List listTablets = new ArrayList(); - Fragment tablet; + List listTablets = new ArrayList(); + FileFragment tablet; FileStatus[] fileLists = fs.listStatus(tablePath); for (FileStatus file : fileLists) { @@ -227,31 +237,31 @@ public abstract class AbstractStorageManager { long start = 0; if (remainFileSize > defaultBlockSize) { while (remainFileSize > defaultBlockSize) { - tablet = new Fragment(tableName, file.getPath(), start, defaultBlockSize); + tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); listTablets.add(tablet); start += defaultBlockSize; remainFileSize -= defaultBlockSize; } - listTablets.add(new Fragment(tableName, file.getPath(), start, remainFileSize)); + listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); } else { - listTablets.add(new Fragment(tableName, file.getPath(), 0, remainFileSize)); + listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); } } - Fragment[] tablets = new Fragment[listTablets.size()]; + FileFragment[] tablets = new FileFragment[listTablets.size()]; listTablets.toArray(tablets); return tablets; } - public static Fragment[] splitNG(Configuration conf, String tableName, TableMeta meta, + public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta, Path tablePath, long size) throws IOException { FileSystem fs = tablePath.getFileSystem(conf); long defaultBlockSize = size; - List listTablets = new ArrayList(); - Fragment tablet; + List listTablets = new ArrayList(); + FileFragment tablet; FileStatus[] fileLists = fs.listStatus(tablePath); for (FileStatus file : fileLists) { @@ -259,18 +269,18 @@ public abstract class AbstractStorageManager { long start = 0; if (remainFileSize > defaultBlockSize) { while (remainFileSize > defaultBlockSize) { - tablet = new Fragment(tableName, file.getPath(), start, defaultBlockSize); + tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); listTablets.add(tablet); start += defaultBlockSize; remainFileSize -= defaultBlockSize; } - listTablets.add(new Fragment(tableName, file.getPath(), start, remainFileSize)); + listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); } else { - listTablets.add(new Fragment(tableName, file.getPath(), 0, remainFileSize)); + listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); } } - Fragment[] tablets = new Fragment[listTablets.size()]; + FileFragment[] tablets = new FileFragment[listTablets.size()]; listTablets.toArray(tablets); return tablets; @@ -396,24 +406,10 @@ public abstract class AbstractStorageManager { * @return is this file isSplittable? */ protected boolean isSplittable(TableMeta meta, Schema schema, Path filename) throws IOException { - Scanner scanner = getScanner(meta, schema, filename); + Scanner scanner = getFileScanner(meta, schema, filename); return scanner.isSplittable(); } - protected boolean isSplittable(CatalogProtos.StoreType storeType) throws IOException { - Method[] methods = getScannerClass(storeType).getMethods(); - - for (Method method : methods) { - ForSplitableStore annos = method.getAnnotation(ForSplitableStore.class); - if (annos != null) { - return true; - } - } - - return false; - } - - @Deprecated protected long computeSplitSize(long blockSize, long minSize, long maxSize) { @@ -444,17 +440,17 @@ public abstract class AbstractStorageManager { * A factory that makes the split for this class. It can be overridden * by sub-classes to make sub-types */ - protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) { - return new Fragment(fragmentId, file, start, length); + protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) { + return new FileFragment(fragmentId, file, start, length); } - protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation, + protected FileFragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation, int[] diskIds) throws IOException { - return new Fragment(fragmentId, file, blockLocation, diskIds); + return new FileFragment(fragmentId, file, blockLocation, diskIds); } // for Non Splittable. eg, compressed gzip TextFile - protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length, + protected FileFragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length, BlockLocation[] blkLocations) throws IOException { Map hostsBlockMap = new HashMap(); @@ -485,7 +481,7 @@ public abstract class AbstractStorageManager { hosts[i] = entry.getKey(); hostsBlockCount[i] = entry.getValue(); } - return new Fragment(fragmentId, file, start, length, hosts, hostsBlockCount); + return new FileFragment(fragmentId, file, start, length, hosts, hostsBlockCount); } /** @@ -536,9 +532,9 @@ public abstract class AbstractStorageManager { * Generate the map of host and make them into Volume Ids. * */ - private Map> getVolumeMap(List frags) { + private Map> getVolumeMap(List frags) { Map> volumeMap = new HashMap>(); - for (Fragment frag : frags) { + for (FileFragment frag : frags) { String[] hosts = frag.getHosts(); int[] diskIds = frag.getDiskIds(); for (int i = 0; i < hosts.length; i++) { @@ -561,10 +557,10 @@ public abstract class AbstractStorageManager { * * @throws IOException */ - public List getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException { + public List getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException { // generate splits' - List splits = new ArrayList(); + List splits = new ArrayList(); List files = listStatus(inputPath); FileSystem fs = inputPath.getFileSystem(conf); for (FileStatus file : files) { @@ -613,22 +609,22 @@ public abstract class AbstractStorageManager { private static final Class[] DEFAULT_SCANNER_PARAMS = { Configuration.class, - TableMeta.class, Schema.class, - Fragment.class + TableMeta.class, + FileFragment.class }; private static final Class[] DEFAULT_APPENDER_PARAMS = { Configuration.class, - TableMeta.class, Schema.class, + TableMeta.class, Path.class }; /** * create a scanner instance. */ - public static T newScannerInstance(Class theClass, Configuration conf, TableMeta meta, Schema schema, + public static T newScannerInstance(Class theClass, Configuration conf, Schema schema, TableMeta meta, Fragment fragment) { T result; try { @@ -638,7 +634,7 @@ public abstract class AbstractStorageManager { meth.setAccessible(true); CONSTRUCTOR_CACHE.put(theClass, meth); } - result = meth.newInstance(new Object[]{conf, meta, schema, fragment}); + result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); } catch (Exception e) { throw new RuntimeException(e); } @@ -659,7 +655,7 @@ public abstract class AbstractStorageManager { meth.setAccessible(true); CONSTRUCTOR_CACHE.put(theClass, meth); } - result = meth.newInstance(new Object[]{conf, meta, schema, path}); + result = meth.newInstance(new Object[]{conf, schema, meta, path}); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java index 5ac989d..6760367 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -39,9 +39,9 @@ import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; import org.apache.tajo.exception.UnsupportedException; -import org.apache.tajo.storage.annotation.ForSplitableStore; import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.Bytes; import java.io.*; @@ -75,8 +75,8 @@ public class CSVFile { private byte[] nullChars; private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); - public CSVAppender(Configuration conf, final TableMeta meta, final Schema schema, final Path path) throws IOException { - super(conf, meta, schema, path); + public CSVAppender(Configuration conf, final Schema schema, final TableMeta meta, final Path path) throws IOException { + super(conf, schema, meta, path); this.fs = path.getFileSystem(conf); this.meta = meta; this.schema = schema; @@ -315,11 +315,10 @@ public class CSVFile { } } - @ForSplitableStore public static class CSVScanner extends FileScanner implements SeekableScanner { - public CSVScanner(Configuration conf, final TableMeta meta, final Schema schema, final Fragment fragment) + public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) throws IOException { - super(conf, meta, schema, fragment); + super(conf, schema, meta, fragment); factory = new CompressionCodecFactory(conf); codec = factory.getCodec(fragment.getPath()); if (isCompress() && !(codec instanceof SplittableCompressionCodec)) { @@ -365,11 +364,11 @@ public class CSVFile { @Override public void init() throws IOException { - // Fragment information + // FileFragment information fs = fragment.getPath().getFileSystem(conf); fis = fs.open(fragment.getPath()); - startOffset = fragment.getStartOffset(); - length = fragment.getLength(); + startOffset = fragment.getStartKey(); + length = fragment.getEndKey(); if(startOffset > 0) startOffset--; // prev line feed http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java index 55a9bf3..064841f 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -35,7 +35,7 @@ public abstract class FileAppender implements Appender { protected boolean enabledStats; - public FileAppender(Configuration conf, TableMeta meta, Schema schema, Path path) { + public FileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) { this.conf = conf; this.meta = meta; this.schema = schema; http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java index 9c22b4f..a9cfe1a 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.storage.fragment.FileFragment; import java.io.IOException; @@ -30,12 +31,12 @@ public abstract class FileScanner implements Scanner { protected final Configuration conf; protected final TableMeta meta; protected final Schema schema; - protected final Fragment fragment; + protected final FileFragment fragment; protected final int columnNum; protected Column [] targets; - public FileScanner(Configuration conf, final TableMeta meta, final Schema schema, final Fragment fragment) { + public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final FileFragment fragment) { this.conf = conf; this.meta = meta; this.schema = schema; http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java deleted file mode 100644 index f40c9a7..0000000 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.base.Objects; -import com.google.gson.annotations.Expose; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import org.apache.tajo.util.TUtil; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -public class Fragment implements Comparable { - protected FragmentProto.Builder builder = null; - - @Expose private String tableName; // required - @Expose private Path uri; // required - @Expose private Long startOffset; // required - @Expose private Long length; // required - @Expose private boolean distCached = false; // optional - - private String[] hosts; // Datanode hostnames - @Expose private int[] hostsBlockCount; // list of block count of hosts - @Expose private int[] diskIds; - - public Fragment() { - builder = FragmentProto.newBuilder(); - } - - public Fragment(String tableName, Path uri, BlockLocation blockLocation, int[] diskIds) - throws IOException { - this(); - this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), - blockLocation.getHosts(), diskIds); - } - - // Non splittable - public Fragment(String tableName, Path uri, long start, long length, String[] hosts, - int[] hostsBlockCount) { - this(); - this.set(tableName, uri, start, length, null, null); - this.hosts = hosts; - this.hostsBlockCount = hostsBlockCount; - } - - public Fragment(String fragmentId, Path path, long start, long length) { - this(); - this.set(fragmentId, path, start, length, null, null); - } - - public Fragment(FragmentProto proto) { - this(); - int[] diskIds = new int[proto.getDiskIdsList().size()]; - int i = 0; - for(Integer eachValue: proto.getDiskIdsList()) { - diskIds[i++] = eachValue; - } - this.set(proto.getId(), new Path(proto.getPath()), - proto.getStartOffset(), proto.getLength(), - proto.getHostsList().toArray(new String[]{}), - diskIds); - if (proto.hasDistCached() && proto.getDistCached()) { - distCached = true; - } - } - - private void set(String tableName, Path path, long start, - long length, String[] hosts, int[] diskIds) { - this.tableName = tableName; - this.uri = path; - this.startOffset = start; - this.length = length; - this.hosts = hosts; - this.diskIds = diskIds; - } - - - /** - * Get the list of hosts (hostname) hosting this block - */ - public String[] getHosts() { - if (hosts == null) { - this.hosts = new String[0]; - } - return hosts; - } - - /** - * Get the list of hosts block count - * if a fragment given multiple block, it returned 'host0:3, host1:1 ...' - */ - public int[] getHostsBlockCount() { - if (hostsBlockCount == null) { - this.hostsBlockCount = new int[getHosts().length]; - Arrays.fill(this.hostsBlockCount, 1); - } - return hostsBlockCount; - } - - /** - * Get the list of Disk Ids - * Unknown disk is -1. Others 0 ~ N - */ - public int[] getDiskIds() { - if (diskIds == null) { - this.diskIds = new int[getHosts().length]; - Arrays.fill(this.diskIds, -1); - } - return diskIds; - } - - public String getName() { - return this.tableName; - } - - public void setName(String tableName) { - this.tableName = tableName; - } - - public Path getPath() { - return this.uri; - } - - public void setPath(Path path) { - this.uri = path; - } - - public Long getStartOffset() { - return this.startOffset; - } - - public Long getLength() { - return this.length; - } - - public Boolean isDistCached() { - return this.distCached; - } - - public void setDistCached() { - this.distCached = true; - } - - /** - * - * The offset range of tablets MUST NOT be overlapped. - * - * @param t - * @return If the table paths are not same, return -1. - */ - @Override - public int compareTo(Fragment t) { - if (getPath().equals(t.getPath())) { - long diff = this.getStartOffset() - t.getStartOffset(); - if (diff < 0) { - return -1; - } else if (diff > 0) { - return 1; - } else { - return 0; - } - } else { - return -1; - } - } - - @Override - public boolean equals(Object o) { - if (o instanceof Fragment) { - Fragment t = (Fragment) o; - if (getPath().equals(t.getPath()) - && TUtil.checkEquals(t.getStartOffset(), this.getStartOffset()) - && TUtil.checkEquals(t.getLength(), this.getLength()) - && TUtil.checkEquals(t.isDistCached(), this.isDistCached())) { - return true; - } - } - return false; - } - - @Override - public int hashCode() { - return Objects.hashCode(tableName, uri, startOffset, length, isDistCached()); - } - - public Object clone() throws CloneNotSupportedException { - Fragment frag = (Fragment) super.clone(); - frag.builder = FragmentProto.newBuilder(); - frag.tableName = tableName; - frag.uri = uri; - frag.distCached = distCached; - frag.diskIds = diskIds; - frag.hosts = hosts; - frag.hostsBlockCount = hostsBlockCount; - - return frag; - } - - @Override - public String toString() { - return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": " - +getPath() + "\", \"start\": " + this.getStartOffset() + ",\"length\": " - + getLength() + ", \"distCached\": " + distCached + "}" ; - } - - public FragmentProto getProto() { - if (builder == null) { - builder = FragmentProto.newBuilder(); - } - builder.setId(this.tableName); - builder.setStartOffset(this.startOffset); - builder.setLength(this.length); - builder.setPath(this.uri.toString()); - builder.setDistCached(this.distCached); - if(diskIds != null) { - List idList = new ArrayList(); - for(int eachId: diskIds) { - idList.add(eachId); - } - builder.addAllDiskIds(idList); - } - - if(hosts != null) { - builder.addAllHosts(TUtil.newList(hosts)); - } - - return builder.build(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java index b392e4e..2caa737 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -18,14 +18,18 @@ package org.apache.tajo.storage; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; import java.io.IOException; -import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -34,17 +38,22 @@ public class MergeScanner implements Scanner { private Configuration conf; private TableMeta meta; private Schema schema; - private List fragments; - private Iterator iterator; - private Fragment currentFragment; + private List fragments; + private Iterator iterator; + private FileFragment currentFragment; private Scanner currentScanner; private Tuple tuple; - public MergeScanner(Configuration conf, TableMeta meta, Schema schema, Collection fragments) { + public MergeScanner(Configuration conf, Schema schema, TableMeta meta, Collection rawFragmentList) + throws IOException { this.conf = conf; - this.meta = meta; this.schema = schema; - this.fragments = new ArrayList(fragments); + this.meta = meta; + this.fragments = Lists.newArrayList(); + for (Fragment f : rawFragmentList) { + fragments.add((FileFragment) f); + } + iterator = this.fragments.iterator(); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/3c22d3eb/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java index 0eb39c1..a484643 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -31,6 +31,7 @@ import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatumFactory; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.util.BitArray; import java.io.File; @@ -58,15 +59,15 @@ public class RawFile { private boolean eof = false; private long fileSize; - public RawFileScanner(Configuration conf, TableMeta meta, Schema schema, Path path) throws IOException { - super(conf, meta, schema, null); + public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { + super(conf, schema, meta, null); this.path = path; init(); } @SuppressWarnings("unused") - public RawFileScanner(Configuration conf, TableMeta meta, Schema schema, Fragment fragment) throws IOException { - this(conf, meta, schema, fragment.getPath()); + public RawFileScanner(Configuration conf, Schema schema, TableMeta meta, FileFragment fragment) throws IOException { + this(conf, schema, meta, fragment.getPath()); } public void init() throws IOException { @@ -309,8 +310,8 @@ public class RawFile { private TableStatistics stats; - public RawFileAppender(Configuration conf, TableMeta meta, Schema schema, Path path) throws IOException { - super(conf, meta, schema, path); + public RawFileAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { + super(conf, schema, meta, path); } public void init() throws IOException {