Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1C3E51038F for ; Sun, 27 Oct 2013 06:41:46 +0000 (UTC) Received: (qmail 10004 invoked by uid 500); 27 Oct 2013 06:41:42 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 9920 invoked by uid 500); 27 Oct 2013 06:41:40 -0000 Mailing-List: contact commits-help@tajo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.incubator.apache.org Delivered-To: mailing list commits@tajo.incubator.apache.org Received: (qmail 9885 invoked by uid 99); 27 Oct 2013 06:41:37 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 27 Oct 2013 06:41:37 +0000 X-ASF-Spam-Status: No, hits=-2000.4 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sun, 27 Oct 2013 06:41:17 +0000 Received: (qmail 8601 invoked by uid 99); 27 Oct 2013 06:40:53 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 27 Oct 2013 06:40:53 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id ED473882233; Sun, 27 Oct 2013 06:40:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hyunsik@apache.org To: commits@tajo.incubator.apache.org Date: Sun, 27 Oct 2013 06:40:53 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/5] TAJO-287: Refactor TableDesc, TableMeta, and Fragment. (hyunsik) X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index 522328b..8b50480 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -93,14 +93,10 @@ public class TestPhysicalPlanner { catalog.createFunction(funcDesc); } - Schema schema = new Schema(); - schema.addColumn("name", Type.TEXT); - schema.addColumn("empId", Type.INT4); - schema.addColumn("deptName", Type.TEXT); - - Schema schema2 = new Schema(); - schema2.addColumn("deptName", Type.TEXT); - schema2.addColumn("manager", Type.TEXT); + Schema employeeSchema = new Schema(); + employeeSchema.addColumn("name", Type.TEXT); + employeeSchema.addColumn("empId", Type.INT4); + employeeSchema.addColumn("deptName", Type.TEXT); Schema scoreSchema = new Schema(); scoreSchema.addColumn("deptName", Type.TEXT); @@ -108,13 +104,14 @@ public class TestPhysicalPlanner { scoreSchema.addColumn("score", Type.INT4); scoreSchema.addColumn("nullable", Type.TEXT); - TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV); + TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path employeePath = new Path(testDir, "employee.csv"); - Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema, + employeePath); appender.init(); - Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum()); + Tuple tuple = new VTuple(employeeSchema.getColumnNum()); for (int i = 0; i < 100; i++) { tuple.put(new Datum[] {DatumFactory.createText("name_" + i), DatumFactory.createInt4(i), DatumFactory.createText("dept_" + i)}); @@ -123,15 +120,15 @@ public class TestPhysicalPlanner { appender.flush(); appender.close(); - employee = new TableDescImpl("employee", employeeMeta, employeePath); + employee = new TableDesc("employee", employeeSchema, employeeMeta, employeePath); catalog.addTable(employee); Path scorePath = new Path(testDir, "score"); - TableMeta scoreMeta = CatalogUtil.newTableMeta(scoreSchema, StoreType.CSV, new Options()); - appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, scorePath); + TableMeta scoreMeta = CatalogUtil.newTableMeta(StoreType.CSV, new Options()); + appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, scoreSchema, scorePath); appender.init(); - score = new TableDescImpl("score", scoreMeta, scorePath); - tuple = new VTuple(score.getMeta().getSchema().getColumnNum()); + score = new TableDesc("score", scoreSchema, scoreMeta, scorePath); + tuple = new VTuple(scoreSchema.getColumnNum()); int m = 0; for (int i = 1; i <= 5; i++) { for (int k = 3; k < 5; k++) { @@ -371,8 +368,7 @@ public class TestPhysicalPlanner { LogicalNode rootNode = optimizer.optimize(plan); - TableMeta outputMeta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), - StoreType.CSV); + TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); @@ -380,7 +376,8 @@ public class TestPhysicalPlanner { exec.next(); exec.close(); - Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, ctx.getOutputPath()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, rootNode.getOutSchema(), + ctx.getOutputPath()); scanner.init(); Tuple tuple; int i = 0; @@ -411,8 +408,7 @@ public class TestPhysicalPlanner { LogicalPlan plan = planner.createPlan(context); LogicalNode rootNode = optimizer.optimize(plan); - TableMeta outputMeta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), - StoreType.RCFILE); + TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RCFILE); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); @@ -420,7 +416,8 @@ public class TestPhysicalPlanner { exec.next(); exec.close(); - Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, ctx.getOutputPath()); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(outputMeta, rootNode.getOutSchema(), + ctx.getOutputPath()); scanner.init(); Tuple tuple; int i = 0; @@ -457,7 +454,7 @@ public class TestPhysicalPlanner { ctx.setDataChannel(dataChannel); LogicalNode rootNode = optimizer.optimize(plan); - TableMeta outputMeta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), StoreType.CSV); + TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV); FileSystem fs = sm.getFileSystem(); @@ -474,9 +471,9 @@ public class TestPhysicalPlanner { Fragment [] fragments = new Fragment[list.length]; int i = 0; for (FileStatus status : list) { - fragments[i++] = new Fragment("partition", status.getPath(), outputMeta, 0, status.getLen()); + fragments[i++] = new Fragment("partition", status.getPath(), 0, status.getLen()); } - Scanner scanner = new MergeScanner(conf, outputMeta,TUtil.newList(fragments)); + Scanner scanner = new MergeScanner(conf, outputMeta,rootNode.getOutSchema(), TUtil.newList(fragments)); scanner.init(); Tuple tuple; @@ -515,8 +512,7 @@ public class TestPhysicalPlanner { ctx.setDataChannel(dataChannel); optimizer.optimize(plan); - TableMeta outputMeta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), - StoreType.CSV); + TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); @@ -533,9 +529,9 @@ public class TestPhysicalPlanner { Fragment [] fragments = new Fragment[list.length]; int i = 0; for (FileStatus status : list) { - fragments[i++] = new Fragment("partition", status.getPath(), outputMeta, 0, status.getLen()); + fragments[i++] = new Fragment("partition", status.getPath(), 0, status.getLen()); } - Scanner scanner = new MergeScanner(conf, outputMeta,TUtil.newList(fragments)); + Scanner scanner = new MergeScanner(conf, outputMeta,rootNode.getOutSchema(), TUtil.newList(fragments)); scanner.init(); Tuple tuple; i = 0; @@ -798,9 +794,9 @@ public class TestPhysicalPlanner { keySchema, comp); reader.open(); Path outputPath = StorageUtil.concatPath(workDir, "output", "output"); - TableMeta meta = CatalogUtil.newTableMeta(rootNode.getOutSchema(), StoreType.CSV, new Options()); + TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV, new Options()); SeekableScanner scanner = - StorageManagerFactory.getSeekableScanner(conf, meta, outputPath); + StorageManagerFactory.getSeekableScanner(conf, meta, employee.getSchema(), outputPath); scanner.init(); int cnt = 0; http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java index f95c3ec..c95881a 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java @@ -19,8 +19,8 @@ package org.apache.tajo.engine.planner.physical; import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.worker.TaskAttemptContext; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; @@ -37,15 +37,14 @@ import org.apache.tajo.engine.planner.logical.LogicalNode; import org.apache.tajo.storage.*; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.TUtil; +import org.apache.tajo.worker.TaskAttemptContext; import org.junit.After; import org.junit.Before; import org.junit.Test; -import org.apache.tajo.LocalTajoTestingUtility; import java.io.IOException; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; // this is not a physical operator in itself, but it uses the HashLeftOuterJoinExec with switched inputs order public class TestRightOuterHashJoinExec { @@ -91,12 +90,11 @@ public class TestRightOuterHashJoinExec { dep3Schema.addColumn("loc_id", Type.INT4); - TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema, - StoreType.CSV); + TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path); + Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); - Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum()); + Tuple tuple = new VTuple(dep3Schema.getColumnNum()); for (int i = 0; i < 10; i++) { tuple.put(new Datum[] { DatumFactory.createInt4(i), DatumFactory.createText("dept_" + i), @@ -106,7 +104,7 @@ public class TestRightOuterHashJoinExec { appender1.flush(); appender1.close(); - dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path); + dep3 = CatalogUtil.newTableDesc("dep3", dep3Schema, dep3Meta, dep3Path); catalog.addTable(dep3); //----------------- job3 ------------------------------ @@ -121,12 +119,11 @@ public class TestRightOuterHashJoinExec { job3Schema.addColumn("job_title", Type.TEXT); - TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema, - StoreType.CSV); + TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path); + Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path); appender2.init(); - Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum()); + Tuple tuple2 = new VTuple(job3Schema.getColumnNum()); for (int i = 1; i < 4; i++) { int x = 100 + i; tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i), @@ -136,7 +133,7 @@ public class TestRightOuterHashJoinExec { appender2.flush(); appender2.close(); - job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path); + job3 = CatalogUtil.newTableDesc("job3", job3Schema, job3Meta, job3Path); catalog.addTable(job3); @@ -161,11 +158,11 @@ public class TestRightOuterHashJoinExec { emp3Schema.addColumn("job_id", Type.INT4); - TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV); + TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path); + Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); - Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum()); + Tuple tuple3 = new VTuple(emp3Schema.getColumnNum()); for (int i = 1; i < 4; i += 2) { int x = 10 + i; @@ -200,7 +197,7 @@ public class TestRightOuterHashJoinExec { appender3.flush(); appender3.close(); - emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path); + emp3 = CatalogUtil.newTableDesc("emp3", emp3Schema, emp3Meta, emp3Path); catalog.addTable(emp3); analyzer = new SQLAnalyzer(); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java index 3918734..bf08479 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java @@ -94,12 +94,11 @@ public class TestRightOuterMergeJoinExec { dep3Schema.addColumn("loc_id", Type.INT4); - TableMeta dep3Meta = CatalogUtil.newTableMeta(dep3Schema, - StoreType.CSV); + TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep3Path = new Path(testDir, "dep3.csv"); - Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Path); + Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path); appender1.init(); - Tuple tuple = new VTuple(dep3Meta.getSchema().getColumnNum()); + Tuple tuple = new VTuple(dep3Schema.getColumnNum()); for (int i = 0; i < 10; i++) { tuple.put(new Datum[] { DatumFactory.createInt4(i), DatumFactory.createText("dept_" + i), @@ -109,7 +108,7 @@ public class TestRightOuterMergeJoinExec { appender1.flush(); appender1.close(); - dep3 = CatalogUtil.newTableDesc("dep3", dep3Meta, dep3Path); + dep3 = CatalogUtil.newTableDesc("dep3", dep3Schema, dep3Meta, dep3Path); catalog.addTable(dep3); @@ -133,12 +132,11 @@ public class TestRightOuterMergeJoinExec { dep4Schema.addColumn("loc_id", Type.INT4); - TableMeta dep4Meta = CatalogUtil.newTableMeta(dep4Schema, - StoreType.CSV); + TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path dep4Path = new Path(testDir, "dep4.csv"); - Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Path); + Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path); appender4.init(); - Tuple tuple4 = new VTuple(dep4Meta.getSchema().getColumnNum()); + Tuple tuple4 = new VTuple(dep4Schema.getColumnNum()); for (int i = 0; i < 11; i++) { tuple4.put(new Datum[] { DatumFactory.createInt4(i), DatumFactory.createText("dept_" + i), @@ -148,7 +146,7 @@ public class TestRightOuterMergeJoinExec { appender4.flush(); appender4.close(); - dep4 = CatalogUtil.newTableDesc("dep4", dep4Meta, dep4Path); + dep4 = CatalogUtil.newTableDesc("dep4", dep4Schema, dep4Meta, dep4Path); catalog.addTable(dep4); @@ -165,12 +163,11 @@ public class TestRightOuterMergeJoinExec { job3Schema.addColumn("job_title", Type.TEXT); - TableMeta job3Meta = CatalogUtil.newTableMeta(job3Schema, - StoreType.CSV); + TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path job3Path = new Path(testDir, "job3.csv"); - Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Path); + Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path); appender2.init(); - Tuple tuple2 = new VTuple(job3Meta.getSchema().getColumnNum()); + Tuple tuple2 = new VTuple(job3Schema.getColumnNum()); for (int i = 1; i < 4; i++) { int x = 100 + i; tuple2.put(new Datum[] { DatumFactory.createInt4(100 + i), @@ -180,7 +177,7 @@ public class TestRightOuterMergeJoinExec { appender2.flush(); appender2.close(); - job3 = CatalogUtil.newTableDesc("job3", job3Meta, job3Path); + job3 = CatalogUtil.newTableDesc("job3", job3Schema, job3Meta, job3Path); catalog.addTable(job3); @@ -205,11 +202,11 @@ public class TestRightOuterMergeJoinExec { emp3Schema.addColumn("job_id", Type.INT4); - TableMeta emp3Meta = CatalogUtil.newTableMeta(emp3Schema, StoreType.CSV); + TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path emp3Path = new Path(testDir, "emp3.csv"); - Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Path); + Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path); appender3.init(); - Tuple tuple3 = new VTuple(emp3Meta.getSchema().getColumnNum()); + Tuple tuple3 = new VTuple(emp3Schema.getColumnNum()); for (int i = 1; i < 4; i += 2) { int x = 10 + i; @@ -244,7 +241,7 @@ public class TestRightOuterMergeJoinExec { appender3.flush(); appender3.close(); - emp3 = CatalogUtil.newTableDesc("emp3", emp3Meta, emp3Path); + emp3 = CatalogUtil.newTableDesc("emp3", emp3Schema, emp3Meta, emp3Path); catalog.addTable(emp3); //---------------------phone3 -------------------- @@ -257,15 +254,15 @@ public class TestRightOuterMergeJoinExec { phone3Schema.addColumn("phone_number", Type.TEXT); - TableMeta phone3Meta = CatalogUtil.newTableMeta(phone3Schema, - StoreType.CSV); + TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV); Path phone3Path = new Path(testDir, "phone3.csv"); - Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Path); + Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema, + phone3Path); appender5.init(); appender5.flush(); appender5.close(); - phone3 = CatalogUtil.newTableDesc("phone3", phone3Meta, phone3Path); + phone3 = CatalogUtil.newTableDesc("phone3", phone3Schema, phone3Meta, phone3Path); catalog.addTable(phone3); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java index 339da1b..3dc3a4a 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java @@ -72,14 +72,14 @@ public class TestSortExec { schema.addColumn("empId", Type.INT4); schema.addColumn("deptName", Type.TEXT); - employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV); + employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); tablePath = StorageUtil.concatPath(workDir, "employee", "table1"); sm.getFileSystem().mkdirs(tablePath.getParent()); - Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, tablePath); appender.init(); - Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum()); + Tuple tuple = new VTuple(schema.getColumnNum()); for (int i = 0; i < 100; i++) { tuple.put(new Datum[] { DatumFactory.createInt4(rnd.nextInt(5)), @@ -90,7 +90,7 @@ public class TestSortExec { appender.flush(); appender.close(); - TableDesc desc = new TableDescImpl("employee", employeeMeta, tablePath); + TableDesc desc = new TableDesc("employee", schema, employeeMeta, tablePath); catalog.addTable(desc); analyzer = new SQLAnalyzer(); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java index f049185..c005551 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestInsertQuery.java @@ -64,7 +64,7 @@ public class TestInsertQuery { res.close(); TableDesc desc = catalog.getTableDesc(tableName); - assertEquals(5, desc.getMeta().getStat().getNumRows().intValue()); + assertEquals(5, desc.getStats().getNumRows().intValue()); } @Test @@ -80,8 +80,8 @@ public class TestInsertQuery { res = tpch.execute("insert overwrite into " + tableName + " select l_orderkey from lineitem"); res.close(); TableDesc desc = catalog.getTableDesc(tableName); - assertEquals(5, desc.getMeta().getStat().getNumRows().intValue()); - assertEquals(originalDesc.getMeta().getSchema(), desc.getMeta().getSchema()); + assertEquals(5, desc.getStats().getNumRows().intValue()); + assertEquals(originalDesc.getSchema(), desc.getSchema()); } @Test @@ -98,7 +98,7 @@ public class TestInsertQuery { + " (col1, col3) select l_orderkey, l_quantity from lineitem"); res.close(); TableDesc desc = catalog.getTableDesc(tableName); - assertEquals(5, desc.getMeta().getStat().getNumRows().intValue()); + assertEquals(5, desc.getStats().getNumRows().intValue()); res = tpch.execute("select * from " + tableName); assertTrue(res.next()); @@ -133,7 +133,7 @@ public class TestInsertQuery { assertFalse(res.next()); res.close(); - assertEquals(originalDesc.getMeta().getSchema(), desc.getMeta().getSchema()); + assertEquals(originalDesc.getSchema(), desc.getSchema()); } @Test @@ -148,7 +148,7 @@ public class TestInsertQuery { res = tpch.execute("insert overwrite into " + tableName + " select * from lineitem where l_orderkey = 3"); res.close(); TableDesc desc = catalog.getTableDesc(tableName); - assertEquals(2, desc.getMeta().getStat().getNumRows().intValue()); + assertEquals(2, desc.getStats().getNumRows().intValue()); } @Test @@ -163,7 +163,7 @@ public class TestInsertQuery { CatalogService catalog = cluster.getMaster().getCatalog(); assertTrue(catalog.existsTable(tableName)); TableDesc orderKeys = catalog.getTableDesc(tableName); - assertEquals(5, orderKeys.getMeta().getStat().getNumRows().intValue()); + assertEquals(5, orderKeys.getStats().getNumRows().intValue()); // this query will result in the two rows. res = tpch.execute( @@ -173,7 +173,7 @@ public class TestInsertQuery { assertTrue(catalog.existsTable(tableName)); orderKeys = catalog.getTableDesc(tableName); - assertEquals(2, orderKeys.getMeta().getStat().getNumRows().intValue()); + assertEquals(2, orderKeys.getStats().getNumRows().intValue()); } @Test @@ -188,7 +188,7 @@ public class TestInsertQuery { res = tpch.execute("insert overwrite into " + tableName + " select * from lineitem where l_orderkey = 3"); res.close(); TableDesc desc = catalog.getTableDesc(tableName); - assertEquals(2, desc.getMeta().getStat().getNumRows().intValue()); + assertEquals(2, desc.getStats().getNumRows().intValue()); } @Test @@ -213,7 +213,7 @@ public class TestInsertQuery { res = tpch.execute("insert overwrite into " + tableName + " select l_orderkey, l_partkey, l_quantity from lineitem where l_orderkey = 3"); res.close(); TableDesc desc = catalog.getTableDesc(tableName); - assertEquals(2, desc.getMeta().getStat().getNumRows().intValue()); + assertEquals(2, desc.getStats().getNumRows().intValue()); FileSystem fs = FileSystem.get(tpch.getTestingCluster().getConfiguration()); assertTrue(fs.exists(desc.getPath())); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java index 643d363..48c424d 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestResultSetImpl.java @@ -27,7 +27,7 @@ import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.TpchTestBase; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.TableStat; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.client.ResultSetImpl; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; @@ -51,6 +51,7 @@ public class TestResultSetImpl { private static TableDesc desc; private static AbstractStorageManager sm; private static TableMeta scoreMeta; + private static Schema scoreSchema; @BeforeClass public static void setup() throws Exception { @@ -58,15 +59,16 @@ public class TestResultSetImpl { conf = util.getConfiguration(); sm = StorageManagerFactory.getStorageManager(conf); - Schema scoreSchema = new Schema(); + scoreSchema = new Schema(); scoreSchema.addColumn("deptname", Type.TEXT); scoreSchema.addColumn("score", Type.INT4); - scoreMeta = CatalogUtil.newTableMeta(scoreSchema, StoreType.CSV); - TableStat stat = new TableStat(); + scoreMeta = CatalogUtil.newTableMeta(StoreType.CSV); + TableStats stats = new TableStats(); Path p = sm.getTablePath("score"); sm.getFileSystem().mkdirs(p); - Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, new Path(p, "score")); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, scoreSchema, + new Path(p, "score")); appender.init(); int deptSize = 100; int tupleNum = 10000; @@ -81,13 +83,13 @@ public class TestResultSetImpl { appender.addTuple(tuple); } appender.close(); - stat.setNumRows(tupleNum); - stat.setNumBytes(written); - stat.setAvgRows(tupleNum); - stat.setNumBlocks(1000); - stat.setNumPartitions(100); - scoreMeta.setStat(stat); - desc = new TableDescImpl("score", scoreMeta, p); + stats.setNumRows(tupleNum); + stats.setNumBytes(written); + stats.setAvgRows(tupleNum); + stats.setNumBlocks(1000); + stats.setNumPartitions(100); + desc = new TableDesc("score", scoreSchema, scoreMeta, p); + desc.setStats(stats); } @AfterClass @@ -100,7 +102,7 @@ public class TestResultSetImpl { ResultSetImpl rs = new ResultSetImpl(null, null, conf, desc); ResultSetMetaData meta = rs.getMetaData(); assertNotNull(meta); - Schema schema = scoreMeta.getSchema(); + Schema schema = scoreSchema; assertEquals(schema.getColumnNum(), meta.getColumnCount()); for (int i = 0; i < meta.getColumnCount(); i++) { assertEquals(schema.getColumn(i).getColumnName(), meta.getColumnName(i + 1)); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java index 0b3a8ce..2547714 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java @@ -456,7 +456,7 @@ public class TestSelectQuery { CatalogService catalog = cluster.getMaster().getCatalog(); assertTrue(catalog.existsTable("orderkeys")); TableDesc orderKeys = catalog.getTableDesc("orderkeys"); - assertEquals(5, orderKeys.getMeta().getStat().getNumRows().intValue()); + assertEquals(5, orderKeys.getStats().getNumRows().intValue()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java index 51dc8fe..c3b5aa3 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestExecutionBlockCursor.java @@ -24,7 +24,7 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.statistics.TableStat; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.planner.LogicalOptimizer; @@ -64,9 +64,9 @@ public class TestExecutionBlockCursor { tpch.loadSchemas(); tpch.loadOutSchema(); for (String table : tpch.getTableNames()) { - TableMeta m = CatalogUtil.newTableMeta(tpch.getSchema(table), CatalogProtos.StoreType.CSV); - m.setStat(new TableStat()); - TableDesc d = CatalogUtil.newTableDesc(table, m, CommonTestingUtil.getTestDir()); + TableMeta m = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV); + TableDesc d = CatalogUtil.newTableDesc(table, tpch.getSchema(table), m, CommonTestingUtil.getTestDir()); + d.setStats(new TableStats()); catalog.addTable(d); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java index e85b54a..ca1247a 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/master/TestGlobalPlanner.java @@ -24,7 +24,7 @@ import org.apache.tajo.algebra.Expr; import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.statistics.TableStat; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.parser.SQLAnalyzer; import org.apache.tajo.engine.planner.LogicalOptimizer; import org.apache.tajo.engine.planner.LogicalPlan; @@ -72,12 +72,11 @@ public class TestGlobalPlanner { tpch.loadSchemas(); tpch.loadOutSchema(); for (int i = 0; i < tables.length; i++) { - TableMeta m = CatalogUtil.newTableMeta(tpch.getSchema(tables[i]), CatalogProtos.StoreType.CSV); - TableStat stat = new TableStat(); - stat.setNumBytes(volumes[i]); - m.setStat(stat); - - TableDesc d = CatalogUtil.newTableDesc(tables[i], m, CommonTestingUtil.getTestDir()); + TableMeta m = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV); + TableStats stats = new TableStats(); + stats.setNumBytes(volumes[i]); + TableDesc d = CatalogUtil.newTableDesc(tables[i], tpch.getSchema(tables[i]), m, CommonTestingUtil.getTestDir()); + d.setStats(stats); catalog.addTable(d); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java index 5ff6844..630acd1 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestFragment.java @@ -20,14 +20,11 @@ package org.apache.tajo.storage; import com.google.common.collect.Sets; import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.util.CommonTestingUtil; import org.junit.Before; import org.junit.Test; -import org.apache.tajo.catalog.CatalogUtil; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.common.TajoDataTypes.Type; import java.util.Arrays; import java.util.SortedSet; @@ -37,7 +34,6 @@ import static org.junit.Assert.assertTrue; public class TestFragment { private Schema schema1; - private TableMeta meta1; private Path path; @Before @@ -45,47 +41,29 @@ public class TestFragment { schema1 = new Schema(); schema1.addColumn("id", Type.INT4); schema1.addColumn("name", Type.TEXT); - meta1 = CatalogUtil.newTableMeta(schema1, StoreType.CSV); path = CommonTestingUtil.getTestDir(); } @Test - public final void testGetAndSetFields() { - Fragment fragment1 = new Fragment("table1_1", new Path(path, "table0"), meta1, 0, 500); + public final void testGetAndSetFields() { + Fragment fragment1 = new Fragment("table1_1", new Path(path, "table0"), 0, 500); fragment1.setDistCached(); assertEquals("table1_1", fragment1.getName()); assertEquals(new Path(path, "table0"), fragment1.getPath()); - assertEquals(meta1.getStoreType(), fragment1.getMeta().getStoreType()); - assertEquals(meta1.getSchema().getColumnNum(), - fragment1.getMeta().getSchema().getColumnNum()); assertTrue(fragment1.isDistCached()); - for(int i=0; i < meta1.getSchema().getColumnNum(); i++) { - assertEquals(meta1.getSchema().getColumn(i).getColumnName(), - fragment1.getMeta().getSchema().getColumn(i).getColumnName()); - assertEquals(meta1.getSchema().getColumn(i).getDataType(), - fragment1.getMeta().getSchema().getColumn(i).getDataType()); - } assertTrue(0 == fragment1.getStartOffset()); assertTrue(500 == fragment1.getLength()); } @Test public final void testTabletTabletProto() { - Fragment fragment0 = new Fragment("table1_1", new Path(path, "table0"), meta1, 0, 500); - + Fragment fragment0 = new Fragment("table1_1", new Path(path, "table0"), 0, 500); + fragment0.setDistCached(); Fragment fragment1 = new Fragment(fragment0.getProto()); assertEquals("table1_1", fragment1.getName()); assertEquals(new Path(path, "table0"), fragment1.getPath()); - assertEquals(meta1.getStoreType(), fragment1.getMeta().getStoreType()); - assertEquals(meta1.getSchema().getColumnNum(), - fragment1.getMeta().getSchema().getColumnNum()); - for(int i=0; i < meta1.getSchema().getColumnNum(); i++) { - assertEquals(meta1.getSchema().getColumn(i).getColumnName(), - fragment1.getMeta().getSchema().getColumn(i).getColumnName()); - assertEquals(meta1.getSchema().getColumn(i).getDataType(), - fragment1.getMeta().getSchema().getColumn(i).getDataType()); - } + assertTrue(fragment1.isDistCached()); assertTrue(0 == fragment1.getStartOffset()); assertTrue(500 == fragment1.getLength()); } @@ -95,7 +73,7 @@ public class TestFragment { final int num = 10; Fragment [] tablets = new Fragment[num]; for (int i = num - 1; i >= 0; i--) { - tablets[i] = new Fragment("tablet1_"+i, new Path(path, "tablet0"), meta1, i * 500, (i+1) * 500); + tablets[i] = new Fragment("tablet1_"+i, new Path(path, "tablet0"), i * 500, (i+1) * 500); } Arrays.sort(tablets); @@ -110,7 +88,7 @@ public class TestFragment { final int num = 1860; Fragment [] tablets = new Fragment[num]; for (int i = num - 1; i >= 0; i--) { - tablets[i] = new Fragment("tablet1_"+i, new Path(path, "tablet0"), meta1, (long)i * 6553500, (long)(i+1) * 6553500); + tablets[i] = new Fragment("tablet1_"+i, new Path(path, "tablet0"), (long)i * 6553500, (long)(i+1) * 6553500); } SortedSet sortedSet = Sets.newTreeSet(); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java index 147fe1d..2678a54 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/storage/TestRowFile.java @@ -27,10 +27,9 @@ import org.apache.tajo.TpchTestBase; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.TableMetaImpl; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.catalog.proto.CatalogProtos.TableProto; -import org.apache.tajo.catalog.statistics.TableStat; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; @@ -68,7 +67,7 @@ public class TestRowFile { schema.addColumn("age", Type.INT8); schema.addColumn("description", Type.TEXT); - TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.ROWFILE); + TableMeta meta = CatalogUtil.newTableMeta(StoreType.ROWFILE); AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf, new Path(conf.getVar(ConfVars.ROOT_DIR))); @@ -80,7 +79,7 @@ public class TestRowFile { FileUtil.writeProto(fs, metaPath, meta.getProto()); - Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, dataPath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, dataPath); appender.enableStats(); appender.init(); @@ -100,18 +99,18 @@ public class TestRowFile { } appender.close(); - TableStat stat = appender.getStats(); + TableStats stat = appender.getStats(); assertEquals(tupleNum, stat.getNumRows().longValue()); FileStatus file = fs.getFileStatus(dataPath); TableProto proto = (TableProto) FileUtil.loadProto( cluster.getDefaultFileSystem(), metaPath, TableProto.getDefaultInstance()); - meta = new TableMetaImpl(proto); - Fragment fragment = new Fragment("test.tbl", dataPath, meta, 0, file.getLen()); + meta = new TableMeta(proto); + Fragment fragment = new Fragment("test.tbl", dataPath, 0, file.getLen()); int tupleCnt = 0; start = System.currentTimeMillis(); - Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment); scanner.init(); while ((tuple=scanner.next()) != null) { tupleCnt++; @@ -125,8 +124,8 @@ public class TestRowFile { long fileLen = file.getLen()/13; for (int i = 0; i < 13; i++) { - fragment = new Fragment("test.tbl", dataPath, meta, fileStart, fileLen); - scanner = new RowFile.RowFileScanner(conf, meta, fragment); + fragment = new Fragment("test.tbl", dataPath, fileStart, fileLen); + scanner = new RowFile.RowFileScanner(conf, meta, schema, fragment); scanner.init(); while ((tuple=scanner.next()) != null) { if (!idSet.remove(tuple.get(0).asInt4())) { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java index 307475a..3b568fc 100644 --- a/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java +++ b/tajo-core/tajo-core-backend/src/test/java/org/apache/tajo/worker/TestRangeRetrieverHandler.java @@ -100,14 +100,14 @@ public class TestRangeRetrieverHandler { Tuple firstTuple = null; Tuple lastTuple; - TableMeta employeeMeta = CatalogUtil.newTableMeta(schema, StoreType.CSV); + TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV); Path tableDir = StorageUtil.concatPath(testDir, "testGet", "table.csv"); fs.mkdirs(tableDir.getParent()); - Appender appender = sm.getAppender(employeeMeta, tableDir); + Appender appender = sm.getAppender(employeeMeta, schema, tableDir); appender.init(); - Tuple tuple = new VTuple(employeeMeta.getSchema().getColumnNum()); + Tuple tuple = new VTuple(schema.getColumnNum()); for (int i = 0; i < TEST_TUPLE; i++) { tuple.put( new Datum[] { @@ -124,7 +124,7 @@ public class TestRangeRetrieverHandler { appender.flush(); appender.close(); - TableDesc employee = new TableDescImpl("employee", employeeMeta, tableDir); + TableDesc employee = new TableDesc("employee", schema, employeeMeta, tableDir); catalog.addTable(employee); Fragment[] frags = StorageManager.splitNG(conf, "employee", employeeMeta, tableDir, Integer.MAX_VALUE); @@ -158,7 +158,7 @@ public class TestRangeRetrieverHandler { new Path(testDir, "output/index"), keySchema, comp); reader.open(); - SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, employeeMeta, + SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, employeeMeta, schema, StorageUtil.concatPath(testDir, "output", "output")); scanner.init(); @@ -215,12 +215,12 @@ public class TestRangeRetrieverHandler { Tuple firstTuple = null; Tuple lastTuple; - TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV); + TableMeta meta = CatalogUtil.newTableMeta(StoreType.CSV); Path tablePath = StorageUtil.concatPath(testDir, "testGetFromDescendingOrder", "table.csv"); fs.mkdirs(tablePath.getParent()); - Appender appender = sm.getAppender(meta, tablePath); + Appender appender = sm.getAppender(meta, schema, tablePath); appender.init(); - Tuple tuple = new VTuple(meta.getSchema().getColumnNum()); + Tuple tuple = new VTuple(schema.getColumnNum()); for (int i = (TEST_TUPLE - 1); i >= 0 ; i--) { tuple.put( new Datum[] { @@ -237,7 +237,7 @@ public class TestRangeRetrieverHandler { appender.flush(); appender.close(); - TableDesc employee = new TableDescImpl("employee", meta, tablePath); + TableDesc employee = new TableDesc("employee", schema, meta, tablePath); catalog.addTable(employee); Fragment[] frags = sm.splitNG(conf, "employee", meta, tablePath, Integer.MAX_VALUE); @@ -269,7 +269,7 @@ public class TestRangeRetrieverHandler { BSTIndex.BSTIndexReader reader = bst.getIndexReader( new Path(testDir, "output/index"), keySchema, comp); reader.open(); - SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, + SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, StorageUtil.concatPath(testDir, "output", "output")); scanner.init(); int cnt = 0; http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java index ef02d2b..0e695a3 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java @@ -28,15 +28,16 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.TableMetaImpl; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.annotation.ForSplitableStore; import org.apache.tajo.util.Bytes; import org.apache.tajo.util.FileUtil; import java.io.FileNotFoundException; import java.io.IOException; import java.lang.reflect.Constructor; +import java.lang.reflect.Method; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -67,8 +68,9 @@ public abstract class AbstractStorageManager { private static final Map, Constructor> CONSTRUCTOR_CACHE = new ConcurrentHashMap, Constructor>(); - public abstract Scanner getScanner(TableMeta meta, Fragment fragment, - Schema target) throws IOException; + public abstract Class getScannerClass(CatalogProtos.StoreType storeType) throws IOException; + + public abstract Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException; protected AbstractStorageManager(TajoConf conf) throws IOException { this.conf = conf; @@ -80,17 +82,17 @@ public abstract class AbstractStorageManager { LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); } - public Scanner getScanner(TableMeta meta, Path path) + public Scanner getScanner(TableMeta meta, Schema schema, Path path) throws IOException { FileSystem fs = path.getFileSystem(conf); FileStatus status = fs.getFileStatus(path); - Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen()); - return getScanner(meta, fragment); + Fragment fragment = new Fragment(path.getName(), path, 0, status.getLen()); + return getScanner(meta, schema, fragment); } - public Scanner getScanner(TableMeta meta, Fragment fragment) + public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { - return getScanner(meta, fragment, meta.getSchema()); + return getScanner(meta, schema, fragment, schema); } public FileSystem getFileSystem() { @@ -129,7 +131,7 @@ public abstract class AbstractStorageManager { return new Path(tableBaseDir, tableName); } - public Appender getAppender(TableMeta meta, Path path) + public Appender getAppender(TableMeta meta, Schema schema, Path path) throws IOException { Appender appender; @@ -149,7 +151,7 @@ public abstract class AbstractStorageManager { throw new IOException("Unknown Storage Type: " + meta.getStoreType()); } - appender = newAppenderInstance(appenderClass, conf, meta, path); + appender = newAppenderInstance(appenderClass, conf, meta, schema, path); return appender; } @@ -168,7 +170,7 @@ public abstract class AbstractStorageManager { CatalogProtos.TableProto tableProto = (CatalogProtos.TableProto) FileUtil.loadProto(tableMetaIn, CatalogProtos.TableProto.getDefaultInstance()); - meta = new TableMetaImpl(tableProto); + meta = new TableMeta(tableProto); return meta; } @@ -191,7 +193,7 @@ public abstract class AbstractStorageManager { FileStatus[] fileLists = fs.listStatus(tablePath); for (FileStatus file : fileLists) { - tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0, file.getLen()); + tablet = new Fragment(tablePath.getName(), file.getPath(), 0, file.getLen()); listTablets.add(tablet); } @@ -225,14 +227,14 @@ public abstract class AbstractStorageManager { long start = 0; if (remainFileSize > defaultBlockSize) { while (remainFileSize > defaultBlockSize) { - tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize); + tablet = new Fragment(tableName, file.getPath(), start, defaultBlockSize); listTablets.add(tablet); start += defaultBlockSize; remainFileSize -= defaultBlockSize; } - listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize)); + listTablets.add(new Fragment(tableName, file.getPath(), start, remainFileSize)); } else { - listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize)); + listTablets.add(new Fragment(tableName, file.getPath(), 0, remainFileSize)); } } @@ -257,14 +259,14 @@ public abstract class AbstractStorageManager { long start = 0; if (remainFileSize > defaultBlockSize) { while (remainFileSize > defaultBlockSize) { - tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize); + tablet = new Fragment(tableName, file.getPath(), start, defaultBlockSize); listTablets.add(tablet); start += defaultBlockSize; remainFileSize -= defaultBlockSize; } - listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize)); + listTablets.add(new Fragment(tableName, file.getPath(), start, remainFileSize)); } else { - listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize)); + listTablets.add(new Fragment(tableName, file.getPath(), 0, remainFileSize)); } } @@ -389,14 +391,29 @@ public abstract class AbstractStorageManager { * false to ensure that individual input files are never split-up * so that Mappers process entire files. * + * * @param filename the file name to check * @return is this file isSplittable? */ - protected boolean isSplittable(TableMeta meta, Path filename) throws IOException { - Scanner scanner = getScanner(meta, filename); + protected boolean isSplittable(TableMeta meta, Schema schema, Path filename) throws IOException { + Scanner scanner = getScanner(meta, schema, filename); return scanner.isSplittable(); } + protected boolean isSplittable(CatalogProtos.StoreType storeType) throws IOException { + Method[] methods = getScannerClass(storeType).getMethods(); + + for (Method method : methods) { + ForSplitableStore annos = method.getAnnotation(ForSplitableStore.class); + if (annos != null) { + return true; + } + } + + return false; + } + + @Deprecated protected long computeSplitSize(long blockSize, long minSize, long maxSize) { @@ -428,12 +445,12 @@ public abstract class AbstractStorageManager { * by sub-classes to make sub-types */ protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) { - return new Fragment(fragmentId, file, meta, start, length); + return new Fragment(fragmentId, file, start, length); } protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation, int[] diskIds) throws IOException { - return new Fragment(fragmentId, file, meta, blockLocation, diskIds); + return new Fragment(fragmentId, file, blockLocation, diskIds); } // for Non Splittable. eg, compressed gzip TextFile @@ -468,7 +485,7 @@ public abstract class AbstractStorageManager { hosts[i] = entry.getKey(); hostsBlockCount[i] = entry.getValue(); } - return new Fragment(fragmentId, file, meta, start, length, hosts, hostsBlockCount); + return new Fragment(fragmentId, file, start, length, hosts, hostsBlockCount); } /** @@ -544,7 +561,7 @@ public abstract class AbstractStorageManager { * * @throws IOException */ - public List getSplits(String tableName, TableMeta meta, Path inputPath) throws IOException { + public List getSplits(String tableName, TableMeta meta, Schema schema, Path inputPath) throws IOException { // generate splits' List splits = new ArrayList(); @@ -555,7 +572,7 @@ public abstract class AbstractStorageManager { long length = file.getLen(); if (length > 0) { BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); - boolean splittable = isSplittable(meta, path); + boolean splittable = isSplittable(meta, schema, inputPath); if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { // supported disk volume BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs) @@ -597,19 +614,21 @@ public abstract class AbstractStorageManager { private static final Class[] DEFAULT_SCANNER_PARAMS = { Configuration.class, TableMeta.class, + Schema.class, Fragment.class }; private static final Class[] DEFAULT_APPENDER_PARAMS = { Configuration.class, TableMeta.class, + Schema.class, Path.class }; /** * create a scanner instance. */ - public static T newScannerInstance(Class theClass, Configuration conf, TableMeta meta, + public static T newScannerInstance(Class theClass, Configuration conf, TableMeta meta, Schema schema, Fragment fragment) { T result; try { @@ -619,7 +638,7 @@ public abstract class AbstractStorageManager { meth.setAccessible(true); CONSTRUCTOR_CACHE.put(theClass, meth); } - result = meth.newInstance(new Object[]{conf, meta, fragment}); + result = meth.newInstance(new Object[]{conf, meta, schema, fragment}); } catch (Exception e) { throw new RuntimeException(e); } @@ -630,7 +649,7 @@ public abstract class AbstractStorageManager { /** * create a scanner instance. */ - public static T newAppenderInstance(Class theClass, Configuration conf, TableMeta meta, + public static T newAppenderInstance(Class theClass, Configuration conf, TableMeta meta, Schema schema, Path path) { T result; try { @@ -640,7 +659,7 @@ public abstract class AbstractStorageManager { meth.setAccessible(true); CONSTRUCTOR_CACHE.put(theClass, meth); } - result = meth.newInstance(new Object[]{conf, meta, path}); + result = meth.newInstance(new Object[]{conf, meta, schema, path}); } catch (Exception e) { throw new RuntimeException(e); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java index 6720614..ed6ea34 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Appender.java @@ -18,7 +18,7 @@ package org.apache.tajo.storage; -import org.apache.tajo.catalog.statistics.TableStat; +import org.apache.tajo.catalog.statistics.TableStats; import java.io.Closeable; import java.io.IOException; @@ -35,5 +35,5 @@ public interface Appender extends Closeable { void enableStats(); - TableStat getStats(); + TableStats getStats(); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java index 550148c..5ac989d 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -31,7 +31,7 @@ import org.apache.hadoop.io.compress.*; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStat; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.datum.CharDatum; import org.apache.tajo.datum.Datum; @@ -39,6 +39,7 @@ import org.apache.tajo.datum.NullDatum; import org.apache.tajo.datum.ProtobufDatum; import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.annotation.ForSplitableStore; import org.apache.tajo.storage.compress.CodecPool; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; import org.apache.tajo.util.Bytes; @@ -74,12 +75,11 @@ public class CSVFile { private byte[] nullChars; private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); - public CSVAppender(Configuration conf, final TableMeta meta, - final Path path) throws IOException { - super(conf, meta, path); + public CSVAppender(Configuration conf, final TableMeta meta, final Schema schema, final Path path) throws IOException { + super(conf, meta, schema, path); this.fs = path.getFileSystem(conf); this.meta = meta; - this.schema = meta.getSchema(); + this.schema = schema; this.delimiter = StringEscapeUtils.unescapeJava(this.meta.getOption(DELIMITER, DELIMITER_DEFAULT)).charAt(0); String nullCharacters = StringEscapeUtils.unescapeJava(this.meta.getOption(NULL)); @@ -298,7 +298,7 @@ public class CSVFile { } @Override - public TableStat getStats() { + public TableStats getStats() { if (enabledStats) { return stats.getTableStat(); } else { @@ -315,10 +315,11 @@ public class CSVFile { } } + @ForSplitableStore public static class CSVScanner extends FileScanner implements SeekableScanner { - public CSVScanner(Configuration conf, final TableMeta meta, - final Fragment fragment) throws IOException { - super(conf, meta, fragment); + public CSVScanner(Configuration conf, final TableMeta meta, final Schema schema, final Fragment fragment) + throws IOException { + super(conf, meta, schema, fragment); factory = new CompressionCodecFactory(conf); codec = factory.getCodec(fragment.getPath()); if (isCompress() && !(codec instanceof SplittableCompressionCodec)) { @@ -327,10 +328,10 @@ public class CSVFile { // Buffer size, Delimiter this.bufSize = DEFAULT_BUFFER_SIZE; - String delim = fragment.getMeta().getOption(DELIMITER, DELIMITER_DEFAULT); + String delim = meta.getOption(DELIMITER, DELIMITER_DEFAULT); this.delimiter = StringEscapeUtils.unescapeJava(delim).charAt(0); - String nullCharacters = StringEscapeUtils.unescapeJava(fragment.getMeta().getOption(NULL)); + String nullCharacters = StringEscapeUtils.unescapeJava(meta.getOption(NULL)); if (StringUtils.isEmpty(nullCharacters)) { nullChars = NullDatum.get().asTextBytes(); } else { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java index ca7c898..55a9bf3 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -35,10 +35,10 @@ public abstract class FileAppender implements Appender { protected boolean enabledStats; - public FileAppender(Configuration conf, TableMeta meta, Path path) { + public FileAppender(Configuration conf, TableMeta meta, Schema schema, Path path) { this.conf = conf; this.meta = meta; - this.schema = meta.getSchema(); + this.schema = schema; this.path = path; } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java index 005879a..9c22b4f 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -35,10 +35,10 @@ public abstract class FileScanner implements Scanner { protected Column [] targets; - public FileScanner(Configuration conf, final TableMeta meta, final Fragment fragment) { + public FileScanner(Configuration conf, final TableMeta meta, final Schema schema, final Fragment fragment) { this.conf = conf; this.meta = meta; - this.schema = meta.getSchema(); + this.schema = schema; this.fragment = fragment; this.columnNum = this.schema.getColumnNum(); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java index 6c31247..f40c9a7 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/Fragment.java @@ -19,15 +19,10 @@ package org.apache.tajo.storage; import com.google.common.base.Objects; -import com.google.gson.Gson; import com.google.gson.annotations.Expose; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; -import org.apache.tajo.json.GsonObject; -import org.apache.tajo.storage.json.StorageGsonHelper; import org.apache.tajo.util.TUtil; import java.io.IOException; @@ -35,12 +30,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -public class Fragment implements TableDesc, Comparable, SchemaObject, GsonObject { +public class Fragment implements Comparable { protected FragmentProto.Builder builder = null; @Expose private String tableName; // required @Expose private Path uri; // required - @Expose private TableMeta meta; // required @Expose private Long startOffset; // required @Expose private Long length; // required @Expose private boolean distCached = false; // optional @@ -53,47 +47,35 @@ public class Fragment implements TableDesc, Comparable, SchemaObject, builder = FragmentProto.newBuilder(); } - public Fragment(String tableName, Path uri, TableMeta meta, BlockLocation blockLocation, int[] diskIds) throws IOException { + public Fragment(String tableName, Path uri, BlockLocation blockLocation, int[] diskIds) + throws IOException { this(); - //TableMeta newMeta = new TableMetaImpl(meta.getProto()); - TableMeta newMeta = meta; - SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta - .getSchema().getProto()); - newMeta.setSchema(new Schema(newSchemaProto)); - this.set(tableName, uri, newMeta, blockLocation.getOffset(), blockLocation.getLength(), + this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), diskIds); } // Non splittable - public Fragment(String tableName, Path uri, TableMeta meta, long start, long length, String[] hosts, int[] hostsBlockCount) { + public Fragment(String tableName, Path uri, long start, long length, String[] hosts, + int[] hostsBlockCount) { this(); - TableMeta newMeta = new TableMetaImpl(meta.getProto()); - SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(tableName, meta - .getSchema().getProto()); - newMeta.setSchema(new Schema(newSchemaProto)); - this.set(tableName, uri, newMeta, start, length, null, null); + this.set(tableName, uri, start, length, null, null); this.hosts = hosts; this.hostsBlockCount = hostsBlockCount; } - public Fragment(String fragmentId, Path path, TableMeta meta, long start, long length) { + public Fragment(String fragmentId, Path path, long start, long length) { this(); - TableMeta newMeta = new TableMetaImpl(meta.getProto()); - SchemaProto newSchemaProto = CatalogUtil.getQualfiedSchema(fragmentId, meta - .getSchema().getProto()); - newMeta.setSchema(new Schema(newSchemaProto)); - this.set(fragmentId, path, newMeta, start, length, null, null); + this.set(fragmentId, path, start, length, null, null); } public Fragment(FragmentProto proto) { this(); - TableMeta newMeta = new TableMetaImpl(proto.getMeta()); int[] diskIds = new int[proto.getDiskIdsList().size()]; int i = 0; for(Integer eachValue: proto.getDiskIdsList()) { diskIds[i++] = eachValue; } - this.set(proto.getId(), new Path(proto.getPath()), newMeta, + this.set(proto.getId(), new Path(proto.getPath()), proto.getStartOffset(), proto.getLength(), proto.getHostsList().toArray(new String[]{}), diskIds); @@ -102,11 +84,10 @@ public class Fragment implements TableDesc, Comparable, SchemaObject, } } - private void set(String tableName, Path path, TableMeta meta, long start, + private void set(String tableName, Path path, long start, long length, String[] hosts, int[] diskIds) { this.tableName = tableName; this.uri = path; - this.meta = meta; this.startOffset = start; this.length = length; this.hosts = hosts; @@ -152,33 +133,17 @@ public class Fragment implements TableDesc, Comparable, SchemaObject, return this.tableName; } - @Override public void setName(String tableName) { this.tableName = tableName; } - - @Override + public Path getPath() { return this.uri; } - @Override public void setPath(Path path) { this.uri = path; } - - public Schema getSchema() { - return getMeta().getSchema(); - } - - public TableMeta getMeta() { - return this.meta; - } - - @Override - public void setMeta(TableMeta meta) { - this.meta = meta; - } public Long getStartOffset() { return this.startOffset; @@ -243,7 +208,6 @@ public class Fragment implements TableDesc, Comparable, SchemaObject, frag.builder = FragmentProto.newBuilder(); frag.tableName = tableName; frag.uri = uri; - frag.meta = (TableMeta) (meta != null ? meta.clone() : null); frag.distCached = distCached; frag.diskIds = diskIds; frag.hosts = hosts; @@ -259,14 +223,12 @@ public class Fragment implements TableDesc, Comparable, SchemaObject, + getLength() + ", \"distCached\": " + distCached + "}" ; } - @Override public FragmentProto getProto() { if (builder == null) { builder = FragmentProto.newBuilder(); } builder.setId(this.tableName); builder.setStartOffset(this.startOffset); - builder.setMeta(meta.getProto()); builder.setLength(this.length); builder.setPath(this.uri.toString()); builder.setDistCached(this.distCached); @@ -284,10 +246,4 @@ public class Fragment implements TableDesc, Comparable, SchemaObject, return builder.build(); } - - @Override - public String toJson() { - Gson gson = StorageGsonHelper.getInstance(); - return gson.toJson(this, TableDesc.class); - } } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java index 582c64f..b392e4e 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -33,15 +33,17 @@ import java.util.List; public class MergeScanner implements Scanner { private Configuration conf; private TableMeta meta; + private Schema schema; private List fragments; private Iterator iterator; private Fragment currentFragment; private Scanner currentScanner; private Tuple tuple; - public MergeScanner(Configuration conf, TableMeta meta, Collection fragments) { + public MergeScanner(Configuration conf, TableMeta meta, Schema schema, Collection fragments) { this.conf = conf; this.meta = meta; + this.schema = schema; this.fragments = new ArrayList(fragments); iterator = this.fragments.iterator(); } @@ -62,7 +64,8 @@ public class MergeScanner implements Scanner { currentScanner.close(); } currentFragment = iterator.next(); - currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, currentFragment); + currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, schema, + currentFragment); currentScanner.init(); return currentScanner.next(); } else { @@ -75,7 +78,8 @@ public class MergeScanner implements Scanner { iterator = fragments.iterator(); if (iterator.hasNext()) { currentFragment = iterator.next(); - currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, currentFragment); + currentScanner = StorageManagerFactory.getStorageManager((TajoConf)conf).getScanner(meta, schema, + currentFragment); } } @@ -110,7 +114,7 @@ public class MergeScanner implements Scanner { @Override public Schema getSchema() { - return meta.getSchema(); + return schema; } @Override http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java index d89a2fc..0eb39c1 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -24,8 +24,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStat; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; @@ -57,15 +58,15 @@ public class RawFile { private boolean eof = false; private long fileSize; - public RawFileScanner(Configuration conf, TableMeta meta, Path path) throws IOException { - super(conf, meta, null); + public RawFileScanner(Configuration conf, TableMeta meta, Schema schema, Path path) throws IOException { + super(conf, meta, schema, null); this.path = path; init(); } @SuppressWarnings("unused") - public RawFileScanner(Configuration conf, TableMeta meta, Fragment fragment) throws IOException { - this(conf, meta, fragment.getPath()); + public RawFileScanner(Configuration conf, TableMeta meta, Schema schema, Fragment fragment) throws IOException { + this(conf, meta, schema, fragment.getPath()); } public void init() throws IOException { @@ -308,8 +309,8 @@ public class RawFile { private TableStatistics stats; - public RawFileAppender(Configuration conf, TableMeta meta, Path path) throws IOException { - super(conf, meta, path); + public RawFileAppender(Configuration conf, TableMeta meta, Schema schema, Path path) throws IOException { + super(conf, meta, schema, path); } public void init() throws IOException { @@ -512,7 +513,7 @@ public class RawFile { } @Override - public TableStat getStats() { + public TableStats getStats() { if (enabledStats) { return stats.getTableStat(); } else { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java index d2b6d8e..f379918 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/RowFile.java @@ -26,11 +26,13 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStat; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.storage.annotation.ForSplitableStore; import org.apache.tajo.storage.exception.AlreadyExistsStorageException; import org.apache.tajo.util.BitArray; @@ -50,6 +52,7 @@ public class RowFile { private final static int DEFAULT_BUFFER_SIZE = 65535; public static int SYNC_INTERVAL; + @ForSplitableStore public static class RowFileScanner extends FileScanner { private FileSystem fs; private FSDataInputStream in; @@ -65,9 +68,9 @@ public class RowFile { private int numBitsOfNullFlags; private long bufferStartPos; - public RowFileScanner(Configuration conf, final TableMeta meta, - final Fragment fragment) throws IOException { - super(conf, meta, fragment); + public RowFileScanner(Configuration conf, final TableMeta meta, final Schema schema, final Fragment fragment) + throws IOException { + super(conf, meta, schema, fragment); SYNC_INTERVAL = conf.getInt(ConfVars.RAWFILE_SYNC_INTERVAL.varname, @@ -323,9 +326,9 @@ public class RowFile { // statistics private TableStatistics stats; - public RowFileAppender(Configuration conf, final TableMeta meta, final Path path) + public RowFileAppender(Configuration conf, final TableMeta meta, final Schema schema, final Path path) throws IOException { - super(conf, meta, path); + super(conf, meta, schema, path); } public void init() throws IOException { @@ -495,7 +498,7 @@ public class RowFile { } @Override - public TableStat getStats() { + public TableStats getStats() { if (enabledStats) { return stats.getTableStat(); } else { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java index 9907591..6dc13e5 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java @@ -20,6 +20,7 @@ package org.apache.tajo.storage; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.conf.TajoConf; import java.io.IOException; @@ -34,27 +35,28 @@ public class StorageManager extends AbstractStorageManager { } @Override - public Scanner getScanner(TableMeta meta, Fragment fragment, - Schema target) throws IOException { - Scanner scanner; - - Class scannerClass; - - String handlerName = meta.getStoreType().name().toLowerCase(); - scannerClass = SCANNER_HANDLER_CACHE.get(handlerName); + public Class getScannerClass(CatalogProtos.StoreType storeType) throws IOException { + String handlerName = storeType.name().toLowerCase(); + Class scannerClass = SCANNER_HANDLER_CACHE.get(handlerName); if (scannerClass == null) { scannerClass = conf.getClass( - String.format("tajo.storage.scanner-handler.%s.class", - meta.getStoreType().name().toLowerCase()), null, - Scanner.class); + String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()), null, Scanner.class); SCANNER_HANDLER_CACHE.put(handlerName, scannerClass); } if (scannerClass == null) { - throw new IOException("Unknown Storage Type: " + meta.getStoreType()); + throw new IOException("Unknown Storage Type: " + storeType.name()); } - scanner = newScannerInstance(scannerClass, conf, meta, fragment); + return scannerClass; + } + + @Override + public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + Scanner scanner; + + Class scannerClass = getScannerClass(meta.getStoreType()); + scanner = newScannerInstance(scannerClass, conf, meta, schema, fragment); if (scanner.isProjectable()) { scanner.setTarget(target.toArray()); } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java index 5d6a298..7ebcd5c 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java @@ -19,8 +19,6 @@ package org.apache.tajo.storage; import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -36,7 +34,7 @@ import java.util.Map; import static org.apache.tajo.conf.TajoConf.ConfVars; public class StorageManagerFactory { - private static final Map storageManagers = Maps.newConcurrentMap(); + private static final Map storageManagers = Maps.newHashMap(); public static AbstractStorageManager getStorageManager(TajoConf conf) throws IOException { return getStorageManager(conf, null); @@ -83,17 +81,17 @@ public class StorageManagerFactory { } public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Fragment fragment, Schema schema) throws IOException { - return (SeekableScanner)getStorageManager(conf, null, false).getScanner(meta, fragment, schema); + TajoConf conf, TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { + return (SeekableScanner)getStorageManager(conf, null, false).getScanner(meta, schema, fragment, target); } public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Path path) throws IOException { + TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException { FileSystem fs = path.getFileSystem(conf); FileStatus status = fs.getFileStatus(path); - Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen()); + Fragment fragment = new Fragment(path.getName(), path, 0, status.getLen()); - return getSeekableScanner(conf, meta, fragment, fragment.getSchema()); + return getSeekableScanner(conf, meta, schema, fragment, schema); } } http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java index ddfe93d..9627a5d 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java @@ -57,8 +57,7 @@ public class StorageUtil { } } - public static void writeTableMeta(Configuration conf, Path tableroot, - TableMeta meta) throws IOException { + public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException { FileSystem fs = tableroot.getFileSystem(conf); FSDataOutputStream out = fs.create(new Path(tableroot, ".meta")); FileUtil.writeProto(out, meta.getProto()); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java index 151fd1b..41c9d61 100644 --- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -19,8 +19,8 @@ package org.apache.tajo.storage; import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.statistics.ColumnStat; -import org.apache.tajo.catalog.statistics.TableStat; +import org.apache.tajo.catalog.statistics.ColumnStats; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.datum.Datum; @@ -97,16 +97,16 @@ public class TableStatistics { } } - public TableStat getTableStat() { - TableStat stat = new TableStat(); + public TableStats getTableStat() { + TableStats stat = new TableStats(); - ColumnStat columnStat; + ColumnStats columnStats; for (int i = 0; i < schema.getColumnNum(); i++) { - columnStat = new ColumnStat(schema.getColumn(i)); - columnStat.setNumNulls(numNulls[i]); - columnStat.setMinValue(minValues.get(i)); - columnStat.setMaxValue(maxValues.get(i)); - stat.addColumnStat(columnStat); + columnStats = new ColumnStats(schema.getColumn(i)); + columnStats.setNumNulls(numNulls[i]); + columnStats.setMinValue(minValues.get(i)); + columnStats.setMaxValue(maxValues.get(i)); + stat.addColumnStat(columnStats); } stat.setNumRows(this.numRows); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/7dc8de28/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java new file mode 100644 index 0000000..ad19101 --- /dev/null +++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface ForSplitableStore { +}