Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C5077CEC1 for ; Fri, 13 Sep 2013 15:57:29 +0000 (UTC) Received: (qmail 33569 invoked by uid 500); 13 Sep 2013 08:55:49 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 33347 invoked by uid 500); 13 Sep 2013 08:55:44 -0000 Mailing-List: contact commits-help@tajo.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.incubator.apache.org Delivered-To: mailing list commits@tajo.incubator.apache.org Received: (qmail 33184 invoked by uid 99); 13 Sep 2013 08:55:40 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Sep 2013 08:55:40 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Fri, 13 Sep 2013 08:55:38 +0000 Received: (qmail 4639 invoked by uid 99); 13 Sep 2013 03:29:57 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 13 Sep 2013 03:29:57 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 9A87016EE6; Fri, 13 Sep 2013 03:29:56 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hyunsik@apache.org To: commits@tajo.incubator.apache.org Date: Fri, 13 Sep 2013 03:29:56 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/4] TAJO-178: Implements StorageManager for scanning asynchronously. (hyoungjunkim via hyunsik) X-Virus-Checked: Checked by ClamAV on apache.org Updated Branches: refs/heads/master 5d3966a8c -> 5ad7fbae9 http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml b/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml index 7111eae..f56413a 100644 --- a/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml +++ b/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml @@ -20,37 +20,86 @@ --> + + tajo.storage.manager.v2 + false + + + + tajo.storage.manager.maxReadBytes + 8388608 + + + + + tajo.storage.manager.concurrency.perDisk + 1 + + + tajo.storage.scanner-handler csv,raw,rcfile,row,trevni + + tajo.storage.scanner-handler.csv.class org.apache.tajo.storage.CSVFile$CSVScanner + tajo.storage.scanner-handler.v2.csv.class + org.apache.tajo.storage.v2.CSVFileScanner + + + tajo.storage.scanner-handler.raw.class org.apache.tajo.storage.RawFile$RawFileScanner + tajo.storage.scanner-handler.v2.raw.class + org.apache.tajo.storage.RawFile$RawFileScanner + + + tajo.storage.scanner-handler.rcfile.class org.apache.tajo.storage.rcfile.RCFileWrapper$RCFileScanner + tajo.storage.scanner-handler.v2.rcfile.class + org.apache.tajo.storage.v2.RCFileScanner + + + tajo.storage.scanner-handler.rowfile.class org.apache.tajo.storage.RowFile$RowFileScanner + tajo.storage.scanner-handler.v2.rowfile.class + org.apache.tajo.storage.RowFile$RowFileScanner + + + tajo.storage.scanner-handler.trevni.class org.apache.tajo.storage.trevni.TrevniScanner + + tajo.storage.scanner-handler.v2.trevni.class + org.apache.tajo.storage.trevni.TrevniScanner + + tajo.storage.appender-handler http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java index 42c68b6..401bd9e 100644 --- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java +++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java @@ -105,7 +105,7 @@ public class TestCompressionStorages { meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName()); Path tablePath = new Path(testDir, "SplitCompression"); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.enableStats(); appender.init(); @@ -138,7 +138,7 @@ public class TestCompressionStorages { tablets[0] = new Fragment("SplitCompression", tablePath, meta, 0, randomNum); tablets[1] = new Fragment("SplitCompression", tablePath, meta, randomNum, (fileLen - randomNum)); - Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0], schema); scanner.init(); int tupleCnt = 0; Tuple tuple; @@ -147,7 +147,7 @@ public class TestCompressionStorages { } scanner.close(); - scanner = StorageManager.getScanner(conf, meta, tablets[1], schema); + scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[1], schema); scanner.init(); while ((tuple = scanner.next()) != null) { tupleCnt++; @@ -167,7 +167,7 @@ public class TestCompressionStorages { String fileName = "Compression_" + codec.getSimpleName(); Path tablePath = new Path(testDir, fileName); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.enableStats(); appender.init(); @@ -196,7 +196,7 @@ public class TestCompressionStorages { Fragment[] tablets = new Fragment[1]; tablets[0] = new Fragment(fileName, tablePath, meta, 0, fileLen); - Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0], schema); scanner.init(); int tupleCnt = 0; Tuple tuple; http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java index 7c40d3d..b869dbb 100644 --- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java @@ -21,11 +21,6 @@ package org.apache.tajo.storage; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameters; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Options; import org.apache.tajo.catalog.Schema; @@ -38,6 +33,11 @@ import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.TUtil; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import java.io.IOException; import java.util.Arrays; @@ -48,7 +48,7 @@ import static org.junit.Assert.assertEquals; @RunWith(Parameterized.class) public class TestMergeScanner { private TajoConf conf; - StorageManager sm; + AbstractStorageManager sm; private static String TEST_PATH = "target/test-data/TestMergeScanner"; private Path testDir; private StoreType storeType; @@ -77,7 +77,7 @@ public class TestMergeScanner { conf.setVar(ConfVars.ROOT_DIR, TEST_PATH); testDir = CommonTestingUtil.getTestDir(TEST_PATH); fs = testDir.getFileSystem(conf); - sm = StorageManager.get(conf, testDir); + sm = StorageManagerFactory.getStorageManager(conf, testDir); } @Test @@ -92,7 +92,7 @@ public class TestMergeScanner { TableMeta meta = CatalogUtil.newTableMeta(schema, storeType, options); Path table1Path = new Path(testDir, storeType + "_1.data"); - Appender appender1 = StorageManager.getAppender(conf, meta, table1Path); + Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, table1Path); appender1.enableStats(); appender1.init(); int tupleNum = 10000; @@ -114,7 +114,7 @@ public class TestMergeScanner { } Path table2Path = new Path(testDir, storeType + "_2.data"); - Appender appender2 = StorageManager.getAppender(conf, meta, table2Path); + Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, table2Path); appender2.enableStats(); appender2.init(); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java index 4c4462f..5881432 100644 --- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java +++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java @@ -20,9 +20,6 @@ package org.apache.tajo.storage; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; @@ -32,6 +29,9 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.util.CommonTestingUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; import java.io.IOException; @@ -40,7 +40,7 @@ import static org.junit.Assert.assertEquals; public class TestStorageManager { private TajoConf conf; private static String TEST_PATH = "target/test-data/TestStorageManager"; - StorageManager sm = null; + AbstractStorageManager sm = null; private Path testDir; private FileSystem fs; @Before @@ -48,7 +48,7 @@ public class TestStorageManager { conf = new TajoConf(); testDir = CommonTestingUtil.getTestDir(TEST_PATH); fs = testDir.getFileSystem(conf); - sm = StorageManager.get(conf, testDir); + sm = StorageManagerFactory.getStorageManager(conf, testDir); } @After @@ -75,14 +75,14 @@ public class TestStorageManager { Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv"); fs.mkdirs(path.getParent()); - Appender appender = StorageManager.getAppender(conf, meta, path); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, path); appender.init(); for(Tuple t : tuples) { appender.addTuple(t); } appender.close(); - Scanner scanner = StorageManager.getScanner(conf, meta, path); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, path); scanner.init(); int i=0; while(scanner.next() != null) { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java index bde9835..364600c 100644 --- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -91,7 +91,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(schema, storeType); Path tablePath = new Path(testDir, "Splitable.data"); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.enableStats(); appender.init(); int tupleNum = 10000; @@ -115,7 +115,7 @@ public class TestStorages { tablets[0] = new Fragment("Splitable", tablePath, meta, 0, randomNum); tablets[1] = new Fragment("Splitable", tablePath, meta, randomNum, (fileLen - randomNum)); - Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0], schema); scanner.init(); int tupleCnt = 0; while (scanner.next() != null) { @@ -123,7 +123,7 @@ public class TestStorages { } scanner.close(); - scanner = StorageManager.getScanner(conf, meta, tablets[1], schema); + scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[1], schema); scanner.init(); while (scanner.next() != null) { tupleCnt++; @@ -144,7 +144,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(schema, storeType); Path tablePath = new Path(testDir, "testProjection.data"); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.init(); int tupleNum = 10000; VTuple vTuple; @@ -164,7 +164,7 @@ public class TestStorages { Schema target = new Schema(); target.addColumn("age", Type.INT8); target.addColumn("score", Type.FLOAT4); - Scanner scanner = StorageManager.getScanner(conf, meta, fragment, target); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment, target); scanner.init(); int tupleCnt = 0; Tuple tuple; @@ -201,7 +201,7 @@ public class TestStorages { TableMeta meta = CatalogUtil.newTableMeta(schema, storeType, options); Path tablePath = new Path(testDir, "testVariousTypes.data"); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.init(); Tuple tuple = new VTuple(12); @@ -225,7 +225,7 @@ public class TestStorages { FileStatus status = fs.getFileStatus(tablePath); Fragment fragment = new Fragment("table", tablePath, meta, 0, status.getLen()); - Scanner scanner = StorageManager.getScanner(conf, meta, fragment); + Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment); scanner.init(); Tuple retrieved; http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java index d97a27c..97123c6 100644 --- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java +++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java @@ -21,8 +21,6 @@ package org.apache.tajo.storage.index; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.junit.Before; -import org.junit.Test; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; @@ -32,7 +30,10 @@ import org.apache.tajo.storage.*; import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader; import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; +import org.apache.tajo.storage.v2.FileScannerV2; import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Before; +import org.junit.Test; import java.io.IOException; @@ -73,7 +74,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(schema, StoreType.CSV); Path tablePath = new Path(testDir, "FindValueInCSV.csv"); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.init(); Tuple tuple; for(int i = 0 ; i < TUPLE_NUM; i ++ ) { @@ -108,8 +109,9 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet)); + SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema()); scanner.init(); + Tuple keyTuple; long offset; while (true) { @@ -130,8 +132,11 @@ public class TestBSTIndex { tuple = new VTuple(keySchema.getColumnNum()); BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema, comp); reader.open(); - scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet)); + scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema()); scanner.init(); + if(scanner instanceof FileScannerV2) { + ((FileScannerV2)scanner).waitScanStart(); + } for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) { tuple.put(0, DatumFactory.createInt8(i)); tuple.put(1, DatumFactory.createFloat8(i)); @@ -157,7 +162,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(schema, StoreType.CSV); Path tablePath = new Path(testDir, "BuildIndexWithAppender.csv"); - FileAppender appender = (FileAppender) StorageManager.getAppender(conf, meta, tablePath); + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.init(); SortSpec [] sortKeys = new SortSpec[2]; @@ -205,8 +210,9 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "BuildIndexWithAppender.idx"), keySchema, comp); reader.open(); - SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet)); + SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema()); scanner.init(); + for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) { tuple.put(0, DatumFactory.createInt8(i)); tuple.put(1, DatumFactory.createFloat8(i)); @@ -232,7 +238,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(schema, StoreType.CSV); Path tablePath = StorageUtil.concatPath(testDir, "FindOmittedValueInCSV.csv"); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.init(); Tuple tuple; for(int i = 0 ; i < TUPLE_NUM; i += 2 ) { @@ -265,8 +271,9 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet)); + SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema()); scanner.init(); + Tuple keyTuple; long offset; while (true) { @@ -299,7 +306,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(schema, StoreType.CSV); Path tablePath = new Path(testDir, "FindNextKeyValueInCSV.csv"); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.init(); Tuple tuple; for(int i = 0 ; i < TUPLE_NUM; i ++ ) { @@ -332,9 +339,10 @@ public class TestBSTIndex { BSTIndex.TWO_LEVEL_INDEX, keySchema, comp); creater.setLoadNum(LOAD_NUM); creater.open(); - - SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet)); + + SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema()); scanner.init(); + Tuple keyTuple; long offset; while (true) { @@ -355,8 +363,9 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"), keySchema, comp); reader.open(); - scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet)); + scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema()); scanner.init(); + Tuple result; for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) { keyTuple = new VTuple(2); @@ -385,7 +394,7 @@ public class TestBSTIndex { meta = CatalogUtil.newTableMeta(schema, StoreType.CSV); Path tablePath = new Path(testDir, "FindNextKeyOmittedValueInCSV.csv"); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.init(); Tuple tuple; for(int i = 0 ; i < TUPLE_NUM; i+=2) { @@ -419,8 +428,9 @@ public class TestBSTIndex { creater.setLoadNum(LOAD_NUM); creater.open(); - SeekableScanner scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet)); + SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema()); scanner.init(); + Tuple keyTuple; long offset; while (true) { @@ -441,8 +451,13 @@ public class TestBSTIndex { BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInCSV.idx"), keySchema, comp); reader.open(); - scanner = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet)); + scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema()); scanner.init(); + + if(scanner instanceof FileScannerV2) { + ((FileScannerV2)scanner).waitScanStart(); + } + Tuple result; for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) { keyTuple = new VTuple(2); http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java index 18a69e6..72bdbb0 100644 --- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java +++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java @@ -21,8 +21,6 @@ package org.apache.tajo.storage.index; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.junit.Before; -import org.junit.Test; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; @@ -34,12 +32,14 @@ import org.apache.tajo.storage.index.bst.BSTIndex; import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader; import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter; import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Before; +import org.junit.Test; import java.io.IOException; +import static org.apache.tajo.storage.CSVFile.CSVScanner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.apache.tajo.storage.CSVFile.CSVScanner; public class TestSingleCSVFileBSTIndex { @@ -77,7 +77,7 @@ public class TestSingleCSVFileBSTIndex { Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv"); fs.mkdirs(tablePath.getParent()); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.init(); Tuple tuple; for (int i = 0; i < TUPLE_NUM; i++) { @@ -166,7 +166,7 @@ public class TestSingleCSVFileBSTIndex { Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV", "table1.csv"); fs.mkdirs(tablePath.getParent()); - Appender appender = StorageManager.getAppender(conf, meta, tablePath); + Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath); appender.init(); Tuple tuple; for(int i = 0 ; i < TUPLE_NUM; i ++ ) { http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml b/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml index 7111eae..637e2f6 100644 --- a/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml +++ b/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml @@ -20,6 +20,11 @@ --> + + tajo.storage.manager.v2 + false + + tajo.storage.scanner-handler @@ -32,25 +37,50 @@ + tajo.storage.scanner-handler.v2.csv.class + org.apache.tajo.storage.v2.CSVFileScanner + + + tajo.storage.scanner-handler.raw.class org.apache.tajo.storage.RawFile$RawFileScanner + tajo.storage.scanner-handler.v2.raw.class + org.apache.tajo.storage.RawFile$RawFileScanner + + + tajo.storage.scanner-handler.rcfile.class org.apache.tajo.storage.rcfile.RCFileWrapper$RCFileScanner + tajo.storage.scanner-handler.v2.rcfile.class + org.apache.tajo.storage.v2.RCFileScanner + + + tajo.storage.scanner-handler.rowfile.class org.apache.tajo.storage.RowFile$RowFileScanner + tajo.storage.scanner-handler.v2.rowfile.class + org.apache.tajo.storage.RowFile$RowFileScanner + + + tajo.storage.scanner-handler.trevni.class org.apache.tajo.storage.trevni.TrevniScanner + + tajo.storage.scanner-handler.v2.trevni.class + org.apache.tajo.storage.trevni.TrevniScanner + + tajo.storage.appender-handler