tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [29/32] tajo git commit: TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)
Date Fri, 12 Dec 2014 06:00:03 GMT
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index a5d1dc5..e976456 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -76,7 +76,7 @@ public class TestNLJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", Type.INT4);
@@ -86,7 +86,8 @@ public class TestNLJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(employeeMeta, schema, employeePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
     for (int i = 0; i < 50; i++) {
@@ -109,7 +110,8 @@ public class TestNLJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < 50; i += 2) {
@@ -144,9 +146,9 @@ public class TestNLJoinExec {
   
   @Test
   public final void testNLCrossJoin() throws IOException, PlanningException {
-    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+    FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
-    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+    FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
         new Path(people.getPath()), Integer.MAX_VALUE);
     
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -159,7 +161,7 @@ public class TestNLJoinExec {
     LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf),
         expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     int i = 0;
@@ -173,9 +175,9 @@ public class TestNLJoinExec {
 
   @Test
   public final void testNLInnerJoin() throws IOException, PlanningException {
-    FileFragment[] empFrags = StorageManager.splitNG(conf, "default.e", employee.getMeta(),
+    FileFragment[] empFrags = FileStorageManager.splitNG(conf, "default.e", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
-    FileFragment[] peopleFrags = StorageManager.splitNG(conf, "default.p", people.getMeta(),
+    FileFragment[] peopleFrags = FileStorageManager.splitNG(conf, "default.p", people.getMeta(),
         new Path(people.getPath()), Integer.MAX_VALUE);
     
     FileFragment[] merged = TUtil.concat(empFrags, peopleFrags);
@@ -188,7 +190,7 @@ public class TestNLJoinExec {
     LogicalNode plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf),
         expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     
     Tuple tuple;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index d507b97..cce4ba7 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -51,10 +51,12 @@ import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.expr.AggregationFunctionCallEval;
 import org.apache.tajo.plan.logical.*;
+import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -78,7 +80,6 @@ import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME;
 import static org.apache.tajo.TajoConstants.DEFAULT_TABLESPACE_NAME;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.ColumnPartitionEnforcer.ColumnPartitionAlgorithm;
 import static org.apache.tajo.ipc.TajoWorkerProtocol.SortEnforce.SortAlgorithm;
-import static org.apache.tajo.plan.serder.PlanProto.ShuffleType;
 import static org.junit.Assert.*;
 
 public class TestPhysicalPlanner {
@@ -88,7 +89,7 @@ public class TestPhysicalPlanner {
   private static SQLAnalyzer analyzer;
   private static LogicalPlanner planner;
   private static LogicalOptimizer optimizer;
-  private static StorageManager sm;
+  private static FileStorageManager sm;
   private static Path testDir;
   private static Session session = LocalTajoTestingUtility.createDummySession();
   private static QueryContext defaultContext;
@@ -106,7 +107,7 @@ public class TestPhysicalPlanner {
     util.startCatalogCluster();
     conf = util.getConfiguration();
     testDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner");
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, testDir);
     catalog = util.getMiniCatalogCluster().getCatalog();
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
@@ -129,8 +130,7 @@ public class TestPhysicalPlanner {
 
 
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
-        employeePath);
+    Appender appender = sm.getAppender(employeeMeta, employeeSchema, employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeSchema.size());
     for (int i = 0; i < 100; i++) {
@@ -148,7 +148,7 @@ public class TestPhysicalPlanner {
 
     Path scorePath = new Path(testDir, "score");
     TableMeta scoreMeta = CatalogUtil.newTableMeta(StoreType.CSV, new KeyValueSet());
-    appender = StorageManager.getStorageManager(conf).getAppender(scoreMeta, scoreSchema, scorePath);
+    appender = sm.getAppender(scoreMeta, scoreSchema, scorePath);
     appender.init();
     score = new TableDesc(
         CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score"), scoreSchema, scoreMeta,
@@ -189,8 +189,8 @@ public class TestPhysicalPlanner {
 
     Schema scoreSchmea = score.getSchema();
     TableMeta scoreLargeMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(scoreLargeMeta, scoreSchmea,
-        scoreLargePath);
+    Appender appender =  ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(scoreLargeMeta, scoreSchmea, scoreLargePath);
     appender.enableStats();
     appender.init();
     largeScore = new TableDesc(
@@ -246,7 +246,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testCreateScanPlan() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -259,7 +259,7 @@ public class TestPhysicalPlanner {
     optimizer.optimize(plan);
 
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     Tuple tuple;
@@ -277,7 +277,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testCreateScanWithFilterPlan() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateScanWithFilterPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -290,7 +290,7 @@ public class TestPhysicalPlanner {
     optimizer.optimize(plan);
 
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     Tuple tuple;
@@ -306,7 +306,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testGroupByPlan() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -318,7 +318,7 @@ public class TestPhysicalPlanner {
     optimizer.optimize(plan);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     int i = 0;
@@ -337,7 +337,7 @@ public class TestPhysicalPlanner {
   @Test
   public final void testHashGroupByPlanWithALLField() throws IOException, PlanningException {
     // TODO - currently, this query does not use hash-based group operator.
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir(
         "target/test-data/testHashGroupByPlanWithALLField");
@@ -349,7 +349,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     int i = 0;
@@ -367,7 +367,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testSortGroupByPlan() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortGroupByPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -378,7 +378,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(defaultContext, context);
     optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan.getRootBlock().getRoot());
 
     /*HashAggregateExec hashAgg = (HashAggregateExec) exec;
@@ -430,7 +430,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testStorePlan() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -443,17 +443,16 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(defaultContext, context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
     exec.close();
 
-    Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
-        ctx.getOutputPath());
+    Scanner scanner =  ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getFileScanner(outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
     int i = 0;
@@ -477,7 +476,7 @@ public class TestPhysicalPlanner {
     TableStats stats = largeScore.getStats();
     assertTrue("Checking meaningfulness of test", stats.getNumBytes() > StorageUnit.MB);
 
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(),
         new Path(largeScore.getPath()), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithMaxOutputFileSize");
 
@@ -497,7 +496,7 @@ public class TestPhysicalPlanner {
     LogicalNode rootNode = optimizer.optimize(plan);
 
     // executing StoreTableExec
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
@@ -512,7 +511,7 @@ public class TestPhysicalPlanner {
     // checking the file contents
     long totalNum = 0;
     for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) {
-      Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(
+      Scanner scanner =  ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner(
           CatalogUtil.newTableMeta(StoreType.CSV),
           rootNode.getOutSchema(),
           status.getPath());
@@ -528,7 +527,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testStorePlanWithRCFile() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithRCFile");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -543,14 +542,14 @@ public class TestPhysicalPlanner {
 
     TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.RCFILE);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
     exec.close();
 
-    Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
-        ctx.getOutputPath());
+    Scanner scanner = ((FileStorageManager)StorageManager.getFileStorageManager(conf)).getFileScanner(
+        outputMeta, rootNode.getOutSchema(), ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
     int i = 0;
@@ -569,7 +568,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testEnforceForDefaultColumnPartitionStorePlan() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -581,7 +580,7 @@ public class TestPhysicalPlanner {
     Expr context = analyzer.parse(CreateTableAsStmts[2]);
     LogicalPlan plan = planner.createPlan(defaultContext, context);
     LogicalNode rootNode = optimizer.optimize(plan);
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     assertTrue(exec instanceof SortBasedColPartitionStoreExec);
   }
@@ -596,7 +595,7 @@ public class TestPhysicalPlanner {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.HASH_PARTITION);
 
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -605,7 +604,7 @@ public class TestPhysicalPlanner {
     ctx.setEnforcer(enforcer);
     ctx.setOutputPath(new Path(workDir, "grouped4"));
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     assertTrue(exec instanceof HashBasedColPartitionStoreExec);
   }
@@ -620,7 +619,7 @@ public class TestPhysicalPlanner {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceColumnPartitionAlgorithm(createTableNode.getPID(), ColumnPartitionAlgorithm.SORT_PARTITION);
 
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -629,14 +628,14 @@ public class TestPhysicalPlanner {
     ctx.setEnforcer(enforcer);
     ctx.setOutputPath(new Path(workDir, "grouped5"));
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     assertTrue(exec instanceof SortBasedColPartitionStoreExec);
   }
 
   @Test
   public final void testPartitionedStorePlan() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), id, new FileFragment[] { frags[0] },
@@ -660,7 +659,7 @@ public class TestPhysicalPlanner {
     QueryId queryId = id.getQueryUnitId().getExecutionBlockId().getQueryId();
     ExecutionBlockId ebId = id.getQueryUnitId().getExecutionBlockId();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
@@ -671,7 +670,7 @@ public class TestPhysicalPlanner {
     Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir);
     FileStatus [] list = fs.listStatus(queryLocalTmpDir);
 
-    List<FileFragment> fragments = new ArrayList<FileFragment>();
+    List<Fragment> fragments = new ArrayList<Fragment>();
     for (FileStatus status : list) {
       assertTrue(status.isDirectory());
       FileStatus [] files = fs.listStatus(status.getPath());
@@ -705,7 +704,7 @@ public class TestPhysicalPlanner {
   public final void testPartitionedStorePlanWithMaxFileSize() throws IOException, PlanningException {
 
     // Preparing working dir and input fragments
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(),
         new Path(largeScore.getPath()), Integer.MAX_VALUE);
     QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlanWithMaxFileSize");
@@ -724,7 +723,7 @@ public class TestPhysicalPlanner {
     LogicalNode rootNode = optimizer.optimize(plan);
 
     // Executing CREATE TABLE PARTITION BY
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
@@ -735,7 +734,7 @@ public class TestPhysicalPlanner {
     // checking the number of partitions
     assertEquals(2, list.length);
 
-    List<FileFragment> fragments = Lists.newArrayList();
+    List<Fragment> fragments = Lists.newArrayList();
     int i = 0;
     for (FileStatus status : list) {
       assertTrue(status.isDirectory());
@@ -769,7 +768,7 @@ public class TestPhysicalPlanner {
   @Test
   public final void testPartitionedStorePlanWithEmptyGroupingSet()
       throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
 
@@ -794,7 +793,7 @@ public class TestPhysicalPlanner {
     QueryId queryId = id.getQueryUnitId().getExecutionBlockId().getQueryId();
     ExecutionBlockId ebId = id.getQueryUnitId().getExecutionBlockId();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
@@ -805,7 +804,7 @@ public class TestPhysicalPlanner {
     Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir);
     FileStatus [] list = fs.listStatus(queryLocalTmpDir);
 
-    List<FileFragment> fragments = new ArrayList<FileFragment>();
+    List<Fragment> fragments = new ArrayList<Fragment>();
     for (FileStatus status : list) {
       assertTrue(status.isDirectory());
       FileStatus [] files = fs.listStatus(status.getPath());
@@ -836,7 +835,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testAggregationFunction() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testAggregationFunction");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -853,7 +852,7 @@ public class TestPhysicalPlanner {
       function.setFirstPhase();
     }
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     exec.init();
@@ -867,7 +866,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testCountFunction() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCountFunction");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -884,7 +883,7 @@ public class TestPhysicalPlanner {
       function.setFirstPhase();
     }
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     Tuple tuple = exec.next();
@@ -895,7 +894,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testGroupByWithNullValue() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByWithNullValue");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -906,7 +905,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(defaultContext, context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     int count = 0;
@@ -920,7 +919,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testUnionPlan() throws IOException, PlanningException, CloneNotSupportedException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testUnionPlan");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -936,7 +935,7 @@ public class TestPhysicalPlanner {
     union.setRightChild((LogicalNode) root.getChild().clone());
     root.setChild(union);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, root);
 
     int count = 0;
@@ -958,7 +957,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     Tuple tuple;
     exec.init();
@@ -971,7 +970,7 @@ public class TestPhysicalPlanner {
     plan = planner.createPlan(defaultContext, expr);
     rootNode = optimizer.optimize(plan);
 
-    phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    phyPlanner = new PhysicalPlannerImpl(conf);
     exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     tuple = exec.next();
@@ -985,7 +984,7 @@ public class TestPhysicalPlanner {
 
   //@Test
   public final void testCreateIndex() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testCreateIndex");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -995,7 +994,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(defaultContext, context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     while (exec.next() != null) {
@@ -1012,7 +1011,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testDuplicateEliminate() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(),
         new Path(score.getPath()), Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testDuplicateEliminate");
@@ -1024,7 +1023,7 @@ public class TestPhysicalPlanner {
     LogicalPlan plan = planner.createPlan(defaultContext, expr);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     Tuple tuple;
 
@@ -1046,7 +1045,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testIndexedStoreExec() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testIndexedStoreExec");
@@ -1064,7 +1063,7 @@ public class TestPhysicalPlanner {
     channel.setShuffleKeys(PlannerUtil.sortSpecsToSchema(sortNode.getSortKeys()).toArray());
     ctx.setDataChannel(channel);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     Tuple tuple;
@@ -1084,7 +1083,7 @@ public class TestPhysicalPlanner {
     Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
     TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new KeyValueSet());
     SeekableScanner scanner =
-        StorageManager.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);
+        FileStorageManager.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);
     scanner.init();
 
     int cnt = 0;
@@ -1139,7 +1138,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testSortEnforcer() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testSortEnforcer");
@@ -1157,7 +1156,7 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
@@ -1179,7 +1178,7 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
-    phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    phyPlanner = new PhysicalPlannerImpl(conf);
     exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
@@ -1190,7 +1189,7 @@ public class TestPhysicalPlanner {
 
   @Test
   public final void testGroupByEnforcer() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.score", score.getMeta(), new Path(score.getPath()),
         Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testGroupByEnforcer");
     Expr context = analyzer.parse(QUERIES[7]);
@@ -1207,7 +1206,7 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();
@@ -1229,7 +1228,7 @@ public class TestPhysicalPlanner {
         new FileFragment[] {frags[0]}, workDir);
     ctx.setEnforcer(enforcer);
 
-    phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    phyPlanner = new PhysicalPlannerImpl(conf);
     exec = phyPlanner.createPlan(ctx, rootNode);
     exec.init();
     exec.next();

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index a23a2d1..3c78b12 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -80,7 +80,7 @@ public class TestProgressExternalSortExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", TajoDataTypes.Type.INT4);
@@ -89,7 +89,8 @@ public class TestProgressExternalSortExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.RAW);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
     appender.init();
     Tuple tuple = new VTuple(schema.size());
@@ -136,7 +137,7 @@ public class TestProgressExternalSortExec {
   }
 
   private void testProgress(int sortBufferBytesNum) throws Exception {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employee.getMeta(),
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employee.getMeta(),
         new Path(employee.getPath()), Integer.MAX_VALUE);
     Path workDir = new Path(testDir, TestExternalSortExec.class.getName());
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
@@ -146,7 +147,7 @@ public class TestProgressExternalSortExec {
     LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), expr);
     LogicalNode rootNode = plan.getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -156,8 +157,7 @@ public class TestProgressExternalSortExec {
       UnaryPhysicalExec sortExec = proj.getChild();
       SeqScanExec scan = sortExec.getChild();
 
-      ExternalSortExec extSort = new ExternalSortExec(ctx, sm,
-          ((MemSortExec)sortExec).getPlan(), scan);
+      ExternalSortExec extSort = new ExternalSortExec(ctx, ((MemSortExec)sortExec).getPlan(), scan);
 
       extSort.setSortBufferBytesNum(sortBufferBytesNum);
       proj.setChild(extSort);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index 9ebe871..879ca21 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -36,10 +36,7 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.plan.LogicalPlanner;
 import org.apache.tajo.plan.PlanningException;
 import org.apache.tajo.plan.logical.LogicalNode;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManager;
-import org.apache.tajo.storage.Tuple;
-import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.TUtil;
@@ -83,7 +80,7 @@ public class TestRightOuterHashJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -106,7 +103,8 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -135,7 +133,8 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -174,7 +173,8 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -232,9 +232,9 @@ public class TestRightOuterHashJoinExec {
 
   @Test
   public final void testRightOuter_HashJoinExec0() throws IOException, PlanningException {
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+    FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
@@ -246,7 +246,7 @@ public class TestRightOuterHashJoinExec {
     Expr expr = analyzer.parse(QUERIES[0]);
     LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -273,9 +273,9 @@ public class TestRightOuterHashJoinExec {
 
   @Test
   public final void testRightOuter_HashJoinExec1() throws IOException, PlanningException {
-    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(emp3Frags, job3Frags);
@@ -287,7 +287,7 @@ public class TestRightOuterHashJoinExec {
     Expr expr = analyzer.parse(QUERIES[1]);
     LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -314,9 +314,9 @@ public class TestRightOuterHashJoinExec {
     @Test
   public final void testRightOuter_HashJoinExec2() throws IOException, PlanningException {
     
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] job3Frags = StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
+    FileFragment[] job3Frags = FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()),
         Integer.MAX_VALUE);
 
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
@@ -328,7 +328,7 @@ public class TestRightOuterHashJoinExec {
     Expr expr = analyzer.parse(QUERIES[2]);
     LogicalNode plan = planner.createPlan(defaultContext, expr).getRootBlock().getRoot();
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index 4956b7f..8bc00cc 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -87,7 +87,7 @@ public class TestRightOuterMergeJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManager.getStorageManager(conf, testDir);
+    sm = StorageManager.getFileStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -110,7 +110,8 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -148,7 +149,8 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep4Path = new Path(testDir, "dep4.csv");
-    Appender appender4 = StorageManager.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path);
+    Appender appender4 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(dep4Meta, dep4Schema, dep4Path);
     appender4.init();
     Tuple tuple4 = new VTuple(dep4Schema.size());
     for (int i = 0; i < 11; i++) {
@@ -179,7 +181,8 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -218,7 +221,8 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -270,8 +274,8 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
-        phone3Path);
+    Appender appender5 = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(phone3Meta, phone3Schema, phone3Path);
     appender5.init();
 
     appender5.flush();
@@ -313,9 +317,9 @@ public class TestRightOuterMergeJoinExec {
     Enforcer enforcer = new Enforcer();
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
-    FileFragment[] emp3Frags = StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
+    FileFragment[] emp3Frags = FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()),
         Integer.MAX_VALUE);
-    FileFragment[] dep3Frags = StorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
+    FileFragment[] dep3Frags = FileStorageManager.splitNG(conf, DEP3_NAME, dep3.getMeta(), new Path(dep3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, dep3Frags);
 
@@ -324,7 +328,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -351,9 +355,9 @@ public class TestRightOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] job3Frags =
-        StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin1");
@@ -361,7 +365,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -387,9 +391,9 @@ public class TestRightOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] job3Frags =
-        StorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, JOB3_NAME, job3.getMeta(), new Path(job3.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(job3Frags, emp3Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuterMergeJoin2");
@@ -397,7 +401,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     ProjectionExec proj = (ProjectionExec) exec;
     assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
@@ -423,9 +427,9 @@ public class TestRightOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] dep4Frags =
-        StorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, DEP4_NAME, dep4.getMeta(), new Path(dep4.getPath()), Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, dep4Frags);
 
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/testRightOuter_MergeJoin3");
@@ -433,7 +437,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
 
     ProjectionExec proj = (ProjectionExec) exec;
@@ -460,9 +464,9 @@ public class TestRightOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
     FileFragment[] phone3Frags =
-        StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+        FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(emp3Frags, phone3Frags);
 
@@ -471,7 +475,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     ProjectionExec proj = (ProjectionExec) exec;
     assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);
@@ -497,8 +501,8 @@ public class TestRightOuterMergeJoinExec {
     enforcer.enforceJoinAlgorithm(joinNode.getPID(), JoinAlgorithm.MERGE_JOIN);
 
     FileFragment[] emp3Frags =
-        StorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
-    FileFragment[] phone3Frags = StorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
+        FileStorageManager.splitNG(conf, EMP3_NAME, emp3.getMeta(), new Path(emp3.getPath()), Integer.MAX_VALUE);
+    FileFragment[] phone3Frags = FileStorageManager.splitNG(conf, PHONE3_NAME, phone3.getMeta(), new Path(phone3.getPath()),
         Integer.MAX_VALUE);
     FileFragment[] merged = TUtil.concat(phone3Frags,emp3Frags);
 
@@ -508,7 +512,7 @@ public class TestRightOuterMergeJoinExec {
         LocalTajoTestingUtility.newQueryUnitAttemptId(), merged, workDir);
     ctx.setEnforcer(enforcer);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, plan);
     ProjectionExec proj = (ProjectionExec) exec;
     assertTrue(proj.getChild() instanceof RightOuterMergeJoinExec);

http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
index afa7430..8a61cab 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestSortExec.java
@@ -57,7 +57,7 @@ public class TestSortExec {
   private static SQLAnalyzer analyzer;
   private static LogicalPlanner planner;
   private static LogicalOptimizer optimizer;
-  private static StorageManager sm;
+  private static FileStorageManager sm;
   private static Path workDir;
   private static Path tablePath;
   private static TableMeta employeeMeta;
@@ -70,7 +70,7 @@ public class TestSortExec {
     util = TpchTestBase.getInstance().getTestingCluster();
     catalog = util.getMaster().getCatalog();
     workDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    sm = StorageManager.getStorageManager(conf, workDir);
+    sm = (FileStorageManager)StorageManager.getFileStorageManager(conf, workDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", Type.INT4);
@@ -82,7 +82,8 @@ public class TestSortExec {
     tablePath = StorageUtil.concatPath(workDir, "employee", "table1");
     sm.getFileSystem().mkdirs(tablePath.getParent());
 
-    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, tablePath);
+    Appender appender = ((FileStorageManager)StorageManager.getFileStorageManager(conf))
+        .getAppender(employeeMeta, schema, tablePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
     for (int i = 0; i < 100; i++) {
@@ -110,7 +111,7 @@ public class TestSortExec {
 
   @Test
   public final void testNext() throws IOException, PlanningException {
-    FileFragment[] frags = StorageManager.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE);
+    FileFragment[] frags = FileStorageManager.splitNG(conf, "default.employee", employeeMeta, tablePath, Integer.MAX_VALUE);
     Path workDir = CommonTestingUtil.getTestDir("target/test-data/TestSortExec");
     TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf),
         LocalTajoTestingUtility
@@ -120,7 +121,7 @@ public class TestSortExec {
     LogicalPlan plan = planner.createPlan(LocalTajoTestingUtility.createDummyContext(conf), context);
     LogicalNode rootNode = optimizer.optimize(plan);
 
-    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf, sm);
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf);
     PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
 
     Tuple tuple;


Mime
View raw message