tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [1/4] TAJO-178: Implements StorageManager for scanning asynchronously. (hyoungjunkim via hyunsik)
Date Fri, 13 Sep 2013 03:29:56 GMT
Updated Branches:
  refs/heads/master 5d3966a8c -> 5ad7fbae9


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml b/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml
index 7111eae..f56413a 100644
--- a/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml
+++ b/tajo-core/tajo-core-storage/src/main/resources/storage-default.xml
@@ -20,37 +20,86 @@
   -->
 
 <configuration>
+  <property>
+    <name>tajo.storage.manager.v2</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <name>tajo.storage.manager.maxReadBytes</name>
+    <value>8388608</value>
+    <description></description>
+  </property>
+
+  <property>
+    <name>tajo.storage.manager.concurrency.perDisk</name>
+    <value>1</value>
+    <description></description>
+  </property>
+
   <!--- Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
     <value>csv,raw,rcfile,row,trevni</value>
   </property>
 
+  <!--
+  <property>
+    <name>tajo.storage.scanner-handler.csv.class</name>
+    <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
+  </property>
+  -->
+
   <property>
     <name>tajo.storage.scanner-handler.csv.class</name>
     <value>org.apache.tajo.storage.CSVFile$CSVScanner</value>
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.v2.csv.class</name>
+    <value>org.apache.tajo.storage.v2.CSVFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.raw.class</name>
     <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.v2.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.rcfile.class</name>
     <value>org.apache.tajo.storage.rcfile.RCFileWrapper$RCFileScanner</value>
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.v2.rcfile.class</name>
+    <value>org.apache.tajo.storage.v2.RCFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.rowfile.class</name>
     <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.v2.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.trevni.class</name>
     <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
   </property>
 
+  <property>
+    <name>tajo.storage.scanner-handler.v2.trevni.class</name>
+    <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+  </property>
+
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 42c68b6..401bd9e 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -105,7 +105,7 @@ public class TestCompressionStorages {
     meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
 
     Path tablePath = new Path(testDir, "SplitCompression");
-    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
     appender.enableStats();
 
     appender.init();
@@ -138,7 +138,7 @@ public class TestCompressionStorages {
     tablets[0] = new Fragment("SplitCompression", tablePath, meta, 0, randomNum);
     tablets[1] = new Fragment("SplitCompression", tablePath, meta, randomNum, (fileLen -
randomNum));
 
-    Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema);
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0],
schema);
     scanner.init();
     int tupleCnt = 0;
     Tuple tuple;
@@ -147,7 +147,7 @@ public class TestCompressionStorages {
     }
     scanner.close();
 
-    scanner = StorageManager.getScanner(conf, meta, tablets[1], schema);
+    scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[1],
schema);
     scanner.init();
     while ((tuple = scanner.next()) != null) {
       tupleCnt++;
@@ -167,7 +167,7 @@ public class TestCompressionStorages {
 
     String fileName = "Compression_" + codec.getSimpleName();
     Path tablePath = new Path(testDir, fileName);
-    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
     appender.enableStats();
 
     appender.init();
@@ -196,7 +196,7 @@ public class TestCompressionStorages {
     Fragment[] tablets = new Fragment[1];
     tablets[0] = new Fragment(fileName, tablePath, meta, 0, fileLen);
 
-    Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema);
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0],
schema);
     scanner.init();
     int tupleCnt = 0;
     Tuple tuple;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 7c40d3d..b869dbb 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -21,11 +21,6 @@ package org.apache.tajo.storage;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Options;
 import org.apache.tajo.catalog.Schema;
@@ -38,6 +33,11 @@ import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -48,7 +48,7 @@ import static org.junit.Assert.assertEquals;
 @RunWith(Parameterized.class)
 public class TestMergeScanner {
   private TajoConf conf;
-  StorageManager sm;
+  AbstractStorageManager sm;
   private static String TEST_PATH = "target/test-data/TestMergeScanner";
   private Path testDir;
   private StoreType storeType;
@@ -77,7 +77,7 @@ public class TestMergeScanner {
     conf.setVar(ConfVars.ROOT_DIR, TEST_PATH);
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
   }
   
   @Test
@@ -92,7 +92,7 @@ public class TestMergeScanner {
     TableMeta meta = CatalogUtil.newTableMeta(schema, storeType, options);
 
     Path table1Path = new Path(testDir, storeType + "_1.data");
-    Appender appender1 = StorageManager.getAppender(conf, meta, table1Path);
+    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(meta,
table1Path);
     appender1.enableStats();
     appender1.init();
     int tupleNum = 10000;
@@ -114,7 +114,7 @@ public class TestMergeScanner {
     }
 
     Path table2Path = new Path(testDir, storeType + "_2.data");
-    Appender appender2 = StorageManager.getAppender(conf, meta, table2Path);
+    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(meta,
table2Path);
     appender2.enableStats();
     appender2.init();
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
index 4c4462f..5881432 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
@@ -20,9 +20,6 @@ package org.apache.tajo.storage;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
@@ -32,6 +29,9 @@ import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 
@@ -40,7 +40,7 @@ import static org.junit.Assert.assertEquals;
 public class TestStorageManager {
 	private TajoConf conf;
 	private static String TEST_PATH = "target/test-data/TestStorageManager";
-	StorageManager sm = null;
+	AbstractStorageManager sm = null;
   private Path testDir;
   private FileSystem fs;
 	@Before
@@ -48,7 +48,7 @@ public class TestStorageManager {
 		conf = new TajoConf();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = StorageManager.get(conf, testDir);
+    sm = StorageManagerFactory.getStorageManager(conf, testDir);
 	}
 
 	@After
@@ -75,14 +75,14 @@ public class TestStorageManager {
 
     Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
     fs.mkdirs(path.getParent());
-		Appender appender = StorageManager.getAppender(conf, meta, path);
+		Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, path);
     appender.init();
 		for(Tuple t : tuples) {
 		  appender.addTuple(t);
 		}
 		appender.close();
 
-		Scanner scanner = StorageManager.getScanner(conf, meta, path);
+		Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, path);
     scanner.init();
 		int i=0;
 		while(scanner.next() != null) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index bde9835..364600c 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -91,7 +91,7 @@ public class TestStorages {
 
       TableMeta meta = CatalogUtil.newTableMeta(schema, storeType);
       Path tablePath = new Path(testDir, "Splitable.data");
-      Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+      Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta,
tablePath);
       appender.enableStats();
       appender.init();
       int tupleNum = 10000;
@@ -115,7 +115,7 @@ public class TestStorages {
       tablets[0] = new Fragment("Splitable", tablePath, meta, 0, randomNum);
       tablets[1] = new Fragment("Splitable", tablePath, meta, randomNum, (fileLen - randomNum));
 
-      Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema);
+      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[0],
schema);
       scanner.init();
       int tupleCnt = 0;
       while (scanner.next() != null) {
@@ -123,7 +123,7 @@ public class TestStorages {
       }
       scanner.close();
 
-      scanner = StorageManager.getScanner(conf, meta, tablets[1], schema);
+      scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, tablets[1],
schema);
       scanner.init();
       while (scanner.next() != null) {
         tupleCnt++;
@@ -144,7 +144,7 @@ public class TestStorages {
     TableMeta meta = CatalogUtil.newTableMeta(schema, storeType);
 
     Path tablePath = new Path(testDir, "testProjection.data");
-    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
     appender.init();
     int tupleNum = 10000;
     VTuple vTuple;
@@ -164,7 +164,7 @@ public class TestStorages {
     Schema target = new Schema();
     target.addColumn("age", Type.INT8);
     target.addColumn("score", Type.FLOAT4);
-    Scanner scanner = StorageManager.getScanner(conf, meta, fragment, target);
+    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment,
target);
     scanner.init();
     int tupleCnt = 0;
     Tuple tuple;
@@ -201,7 +201,7 @@ public class TestStorages {
     TableMeta meta = CatalogUtil.newTableMeta(schema, storeType, options);
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
     appender.init();
 
     Tuple tuple = new VTuple(12);
@@ -225,7 +225,7 @@ public class TestStorages {
 
     FileStatus status = fs.getFileStatus(tablePath);
     Fragment fragment = new Fragment("table", tablePath, meta, 0, status.getLen());
-    Scanner scanner =  StorageManager.getScanner(conf, meta, fragment);
+    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, fragment);
     scanner.init();
 
     Tuple retrieved;

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index d97a27c..97123c6 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -21,8 +21,6 @@ package org.apache.tajo.storage.index;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -32,7 +30,10 @@ import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
 import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
+import org.apache.tajo.storage.v2.FileScannerV2;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 
@@ -73,7 +74,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
     
     Path tablePath = new Path(testDir, "FindValueInCSV.csv");
-    Appender appender  = StorageManager.getAppender(conf, meta, tablePath);
+    Appender appender  = StorageManagerFactory.getStorageManager(conf).getAppender(meta,
tablePath);
     appender.init();
     Tuple tuple;
     for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
@@ -108,8 +109,9 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet,
meta.getSchema());
     scanner.init();
+
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -130,8 +132,11 @@ public class TestBSTIndex {
     tuple = new VTuple(keySchema.getColumnNum());
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindValueInCSV.idx"), keySchema,
comp);
     reader.open();
-    scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
     scanner.init();
+    if(scanner instanceof FileScannerV2) {
+      ((FileScannerV2)scanner).waitScanStart();
+    }
     for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
       tuple.put(0, DatumFactory.createInt8(i));
       tuple.put(1, DatumFactory.createFloat8(i));
@@ -157,7 +162,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
 
     Path tablePath = new Path(testDir, "BuildIndexWithAppender.csv");
-    FileAppender appender  = (FileAppender) StorageManager.getAppender(conf, meta, tablePath);
+    FileAppender appender  = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta,
tablePath);
     appender.init();
 
     SortSpec [] sortKeys = new SortSpec[2];
@@ -205,8 +210,9 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "BuildIndexWithAppender.idx"),
         keySchema, comp);
     reader.open();
-    SeekableScanner scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, tablet,
meta.getSchema());
     scanner.init();
+
     for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
       tuple.put(0, DatumFactory.createInt8(i));
       tuple.put(1, DatumFactory.createFloat8(i));
@@ -232,7 +238,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
     
     Path tablePath = StorageUtil.concatPath(testDir, "FindOmittedValueInCSV.csv");
-    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
     appender.init();
     Tuple tuple;
     for(int i = 0 ; i < TUPLE_NUM; i += 2 ) {
@@ -265,8 +271,9 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    SeekableScanner scanner  = StorageManagerFactory.getSeekableScanner(conf, meta, tablet,
meta.getSchema());
     scanner.init();
+
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -299,7 +306,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
 
     Path tablePath = new Path(testDir, "FindNextKeyValueInCSV.csv");
-    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
     appender.init();
     Tuple tuple;
     for(int i = 0 ; i < TUPLE_NUM; i ++ ) {
@@ -332,9 +339,10 @@ public class TestBSTIndex {
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);
     creater.setLoadNum(LOAD_NUM);
     creater.open();
-    
-    SeekableScanner scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+
+    SeekableScanner scanner  = StorageManagerFactory.getSeekableScanner(conf, meta, tablet,
meta.getSchema());
     scanner.init();
+
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -355,8 +363,9 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyValueInCSV.idx"),
         keySchema, comp);
     reader.open();
-    scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner  = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
     scanner.init();
+
     Tuple result;
     for(int i = 0 ; i < TUPLE_NUM -1 ; i ++) {
       keyTuple = new VTuple(2);
@@ -385,7 +394,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
 
     Path tablePath = new Path(testDir, "FindNextKeyOmittedValueInCSV.csv");
-    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
     appender.init();
     Tuple tuple;
     for(int i = 0 ; i < TUPLE_NUM; i+=2) {
@@ -419,8 +428,9 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    SeekableScanner scanner  = StorageManagerFactory.getSeekableScanner(conf, meta, tablet,
meta.getSchema());
     scanner.init();
+
     Tuple keyTuple;
     long offset;
     while (true) {
@@ -441,8 +451,13 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "FindNextKeyOmittedValueInCSV.idx"),
         keySchema, comp);
     reader.open();
-    scanner  = (SeekableScanner)(StorageManager.getScanner(conf, meta, tablet));
+    scanner  = StorageManagerFactory.getSeekableScanner(conf, meta, tablet, meta.getSchema());
     scanner.init();
+
+    if(scanner instanceof FileScannerV2) {
+      ((FileScannerV2)scanner).waitScanStart();
+    }
+
     Tuple result;
     for(int i = 1 ; i < TUPLE_NUM -1 ; i+=2) {
       keyTuple = new VTuple(2);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 18a69e6..72bdbb0 100644
--- a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -21,8 +21,6 @@ package org.apache.tajo.storage.index;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.junit.Before;
-import org.junit.Test;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
 import org.apache.tajo.common.TajoDataTypes.Type;
@@ -34,12 +32,14 @@ import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexReader;
 import org.apache.tajo.storage.index.bst.BSTIndex.BSTIndexWriter;
 import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.tajo.storage.CSVFile.CSVScanner;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
-import static org.apache.tajo.storage.CSVFile.CSVScanner;
 
 public class TestSingleCSVFileBSTIndex {
   
@@ -77,7 +77,7 @@ public class TestSingleCSVFileBSTIndex {
     Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv");
     fs.mkdirs(tablePath.getParent());
 
-    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i++) {
@@ -166,7 +166,7 @@ public class TestSingleCSVFileBSTIndex {
     Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV",
         "table1.csv");
     fs.mkdirs(tablePath.getParent());
-    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, tablePath);
     appender.init();
     Tuple tuple;
     for(int i = 0 ; i < TUPLE_NUM; i ++ ) {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml b/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml
index 7111eae..637e2f6 100644
--- a/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml
+++ b/tajo-core/tajo-core-storage/src/test/resources/storage-default.xml
@@ -20,6 +20,11 @@
   -->
 
 <configuration>
+  <property>
+    <name>tajo.storage.manager.v2</name>
+    <value>false</value>
+  </property>
+
   <!--- Scanner Handler -->
   <property>
     <name>tajo.storage.scanner-handler</name>
@@ -32,25 +37,50 @@
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.v2.csv.class</name>
+    <value>org.apache.tajo.storage.v2.CSVFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.raw.class</name>
     <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.v2.raw.class</name>
+    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.rcfile.class</name>
     <value>org.apache.tajo.storage.rcfile.RCFileWrapper$RCFileScanner</value>
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.v2.rcfile.class</name>
+    <value>org.apache.tajo.storage.v2.RCFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.rowfile.class</name>
     <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
   </property>
 
   <property>
+    <name>tajo.storage.scanner-handler.v2.rowfile.class</name>
+    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
+  </property>
+
+  <property>
     <name>tajo.storage.scanner-handler.trevni.class</name>
     <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
   </property>
 
+  <property>
+    <name>tajo.storage.scanner-handler.v2.trevni.class</name>
+    <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
+  </property>
+
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>


Mime
View raw message