tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [5/5] git commit: TAJO-1121: Remove the 'v2' storage package.
Date Thu, 23 Oct 2014 03:19:33 GMT
TAJO-1121: Remove the 'v2' storage package.

Closes #203


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b636e8b8
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b636e8b8
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b636e8b8

Branch: refs/heads/master
Commit: b636e8b8be623a87d291d7be7aa1dd645502469f
Parents: 0fb2285
Author: HyoungJun Kim <babokim@babokim-MacBook-Pro.local>
Authored: Wed Oct 22 23:46:59 2014 +0900
Committer: HyoungJun Kim <babokim@babokim-MacBook-Pro.local>
Committed: Thu Oct 23 12:18:40 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |    1 +
 .../java/org/apache/tajo/conf/TajoConf.java     |    7 -
 .../engine/planner/PhysicalPlannerImpl.java     |    6 +-
 .../planner/physical/BSTIndexScanExec.java      |    4 +-
 .../planner/physical/ColPartitionStoreExec.java |    4 +-
 .../planner/physical/ExternalSortExec.java      |    6 +-
 .../physical/HashShuffleFileWriteExec.java      |    4 +-
 .../physical/PartitionMergeScanExec.java        |    6 +-
 .../physical/RangeShuffleFileWriteExec.java     |    4 +-
 .../engine/planner/physical/SeqScanExec.java    |    4 +-
 .../engine/planner/physical/StoreTableExec.java |    6 +-
 .../org/apache/tajo/master/GlobalEngine.java    |    2 +-
 .../master/NonForwardQueryResultScanner.java    |    4 +-
 .../java/org/apache/tajo/master/TajoMaster.java |   11 +-
 .../apache/tajo/master/querymaster/Query.java   |    4 +-
 .../tajo/master/querymaster/QueryMaster.java    |    9 +-
 .../master/querymaster/QueryMasterTask.java     |    4 +-
 .../tajo/master/querymaster/Repartitioner.java  |    6 +-
 .../tajo/master/querymaster/SubQuery.java       |    9 +-
 .../org/apache/tajo/worker/TajoQueryEngine.java |    7 +-
 .../tajo/worker/WorkerHeartbeatService.java     |    6 +-
 .../org/apache/tajo/BackendTestingUtil.java     |    2 +-
 .../planner/global/TestBroadcastJoinPlan.java   |    4 +-
 .../planner/physical/TestBNLJoinExec.java       |    8 +-
 .../planner/physical/TestBSTIndexExec.java      |    8 +-
 .../planner/physical/TestExternalSortExec.java  |    6 +-
 .../physical/TestFullOuterHashJoinExec.java     |   12 +-
 .../physical/TestFullOuterMergeJoinExec.java    |   14 +-
 .../planner/physical/TestHashAntiJoinExec.java  |    8 +-
 .../planner/physical/TestHashJoinExec.java      |    8 +-
 .../planner/physical/TestHashSemiJoinExec.java  |    8 +-
 .../physical/TestLeftOuterHashJoinExec.java     |   12 +-
 .../physical/TestLeftOuterNLJoinExec.java       |   12 +-
 .../planner/physical/TestMergeJoinExec.java     |    8 +-
 .../engine/planner/physical/TestNLJoinExec.java |    8 +-
 .../planner/physical/TestPhysicalPlanner.java   |   18 +-
 .../physical/TestProgressExternalSortExec.java  |    6 +-
 .../physical/TestRightOuterHashJoinExec.java    |   10 +-
 .../physical/TestRightOuterMergeJoinExec.java   |   14 +-
 .../engine/planner/physical/TestSortExec.java   |    6 +-
 .../tajo/engine/query/TestJoinBroadcast.java    |    2 +-
 .../org/apache/tajo/jdbc/TestResultSet.java     |    6 +-
 .../tajo/master/TestExecutionBlockCursor.java   |    5 +-
 .../org/apache/tajo/storage/TestRowFile.java    |    6 +-
 .../tajo/worker/TestRangeRetrieverHandler.java  |    8 +-
 .../tajo/storage/AbstractStorageManager.java    |  729 -------
 .../org/apache/tajo/storage/DiskDeviceInfo.java |   62 +
 .../java/org/apache/tajo/storage/DiskInfo.java  |   75 +
 .../org/apache/tajo/storage/DiskMountInfo.java  |  101 +
 .../java/org/apache/tajo/storage/DiskUtil.java  |  207 ++
 .../storage/HashShuffleAppenderManager.java     |    3 +-
 .../org/apache/tajo/storage/MergeScanner.java   |    2 +-
 .../org/apache/tajo/storage/StorageManager.java |  743 ++++++-
 .../tajo/storage/StorageManagerFactory.java     |   98 -
 .../apache/tajo/storage/v2/CSVFileScanner.java  |  386 ----
 .../apache/tajo/storage/v2/DiskDeviceInfo.java  |   62 -
 .../tajo/storage/v2/DiskFileScanScheduler.java  |  205 --
 .../org/apache/tajo/storage/v2/DiskInfo.java    |   75 -
 .../apache/tajo/storage/v2/DiskMountInfo.java   |  101 -
 .../org/apache/tajo/storage/v2/DiskUtil.java    |  207 --
 .../apache/tajo/storage/v2/FileScanRunner.java  |   70 -
 .../apache/tajo/storage/v2/FileScannerV2.java   |  234 ---
 .../java/org/apache/tajo/storage/v2/RCFile.java | 1818 ------------------
 .../apache/tajo/storage/v2/RCFileScanner.java   |  300 ---
 .../apache/tajo/storage/v2/ScanScheduler.java   |  192 --
 .../tajo/storage/v2/ScheduledInputStream.java   |  513 -----
 .../tajo/storage/v2/StorageManagerV2.java       |  152 --
 .../tajo/storage/TestCompressionStorages.java   |   10 +-
 .../apache/tajo/storage/TestFileSystems.java    |    4 +-
 .../apache/tajo/storage/TestMergeScanner.java   |    8 +-
 .../apache/tajo/storage/TestStorageManager.java |   12 +-
 .../org/apache/tajo/storage/TestStorages.java   |   48 +-
 .../apache/tajo/storage/index/TestBSTIndex.java |   52 +-
 .../index/TestSingleCSVFileBSTIndex.java        |    4 +-
 .../tajo/storage/v2/TestCSVCompression.java     |  213 --
 .../apache/tajo/storage/v2/TestCSVScanner.java  |  168 --
 .../apache/tajo/storage/v2/TestStorages.java    |  292 ---
 .../src/test/resources/storage-default.xml      |   57 -
 78 files changed, 1407 insertions(+), 6109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 3d44a7c..e498d9e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -14,6 +14,7 @@ Release 0.9.1 - unreleased
     TAJO-1092: Improve the function system to allow other function 
     implementation types. (hyunsik)
 
+    TAJO-1121: Remove the 'v2' storage package. (Hyoungjun Kim)
 
   BUG FIXES
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
index 66d3030..181ef2e 100644
--- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java
@@ -233,13 +233,6 @@ public class TajoConf extends Configuration {
     // for RCFile
     HIVEUSEEXPLICITRCFILEHEADER("tajo.exec.rcfile.use.explicit.header", true),
 
-    // for Storage Manager v2
-    STORAGE_MANAGER_VERSION_2("tajo.storage-manager.v2", false),
-    STORAGE_MANAGER_DISK_SCHEDULER_MAX_READ_BYTES_PER_SLOT("tajo.storage-manager.max-read-bytes", 8 * 1024 * 1024),
-    STORAGE_MANAGER_DISK_SCHEDULER_REPORT_INTERVAL("tajo.storage-manager.disk-scheduler.report-interval", 60 * 1000),
-    STORAGE_MANAGER_CONCURRENCY_PER_DISK("tajo.storage-manager.disk-scheduler.per-disk-concurrency", 2),
-
-
     // RPC --------------------------------------------------------------------
     RPC_POOL_MAX_IDLE("tajo.rpc.pool.idle.max", 10),
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
index 6b1c65c..485677a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java
@@ -45,8 +45,8 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.DistinctAggregationAlgorithm;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage;
 import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecArray;
-import org.apache.tajo.storage.AbstractStorageManager;
 import org.apache.tajo.storage.StorageConstants;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.TupleComparator;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.FragmentConvertor;
@@ -75,9 +75,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner {
   private static final int UNGENERATED_PID = -1;
 
   protected final TajoConf conf;
-  protected final AbstractStorageManager sm;
+  protected final StorageManager sm;
 
-  public PhysicalPlannerImpl(final TajoConf conf, final AbstractStorageManager sm) {
+  public PhysicalPlannerImpl(final TajoConf conf, final StorageManager sm) {
     this.conf = conf;
     this.sm = sm;
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
index bdd1e07..51f70a0 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java
@@ -48,7 +48,7 @@ public class BSTIndexScanExec extends PhysicalExec {
   private float progress;
 
   public BSTIndexScanExec(TaskAttemptContext context,
-                          AbstractStorageManager sm , ScanNode scanNode ,
+                          StorageManager sm , ScanNode scanNode ,
        FileFragment fragment, Path fileName , Schema keySchema,
        TupleComparator comparator , Datum[] datum) throws IOException {
     super(context, scanNode.getInSchema(), scanNode.getOutSchema());
@@ -56,7 +56,7 @@ public class BSTIndexScanExec extends PhysicalExec {
     this.qual = scanNode.getQual();
     this.datum = datum;
 
-    this.fileScanner = StorageManagerFactory.getSeekableScanner(context.getConf(),
+    this.fileScanner = StorageManager.getSeekableScanner(context.getConf(),
         scanNode.getTableDesc().getMeta(), scanNode.getInSchema(), fragment, outSchema);
     this.fileScanner.init();
     this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets());

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index 92738e5..a072904 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -35,7 +35,7 @@ import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -160,7 +160,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
       actualFilePath = new Path(lastFileName + "_" + suffixId);
     }
 
-    appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, actualFilePath);
+    appender = StorageManager.getStorageManager(context.getConf()).getAppender(meta, outSchema, actualFilePath);
 
     appender.enableStats();
     appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
index 0094590..cfd7fb7 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java
@@ -106,7 +106,7 @@ public class ExternalSortExec extends SortExec {
   /** total bytes of input data */
   private long sortAndStoredBytes;
 
-  private ExternalSortExec(final TaskAttemptContext context, final AbstractStorageManager sm, final SortNode plan)
+  private ExternalSortExec(final TaskAttemptContext context, final StorageManager sm, final SortNode plan)
       throws PhysicalPlanningException {
     super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys());
 
@@ -129,7 +129,7 @@ public class ExternalSortExec extends SortExec {
   }
 
   public ExternalSortExec(final TaskAttemptContext context,
-                          final AbstractStorageManager sm, final SortNode plan,
+                          final StorageManager sm, final SortNode plan,
                           final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException {
     this(context, sm, plan);
 
@@ -141,7 +141,7 @@ public class ExternalSortExec extends SortExec {
   }
 
   public ExternalSortExec(final TaskAttemptContext context,
-                          final AbstractStorageManager sm, final SortNode plan, final PhysicalExec child)
+                          final StorageManager sm, final SortNode plan, final PhysicalExec child)
       throws IOException {
     this(context, sm, plan);
     setChild(child);

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index cee2b77..47fcb8d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -27,7 +27,7 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.engine.planner.logical.ShuffleFileWriteNode;
-import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.HashShuffleAppender;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.Tuple;
@@ -55,7 +55,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
   private HashShuffleAppenderManager hashShuffleAppenderManager;
   private int numHashShuffleBufferTuples;
 
-  public HashShuffleFileWriteExec(TaskAttemptContext context, final AbstractStorageManager sm,
+  public HashShuffleFileWriteExec(TaskAttemptContext context, final StorageManager sm,
                                   final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     Preconditions.checkArgument(plan.hasShuffleKeys());

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
index d87a30d..4c72075 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java
@@ -23,7 +23,7 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.PlannerUtil;
 import org.apache.tajo.engine.planner.logical.ScanNode;
-import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -44,12 +44,12 @@ public class PartitionMergeScanExec extends PhysicalExec {
   private List<SeqScanExec> scanners = Lists.newArrayList();
   private Iterator<SeqScanExec> iterator;
 
-  private AbstractStorageManager sm;
+  private StorageManager sm;
 
   private float progress;
   protected TableStats inputStats;
 
-  public PartitionMergeScanExec(TaskAttemptContext context, AbstractStorageManager sm,
+  public PartitionMergeScanExec(TaskAttemptContext context, StorageManager sm,
                                 ScanNode plan, CatalogProtos.FragmentProto[] fragments) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 68379d1..4e5d1cf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -51,7 +51,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
   private FileAppender appender;
   private TableMeta meta;
 
-  public RangeShuffleFileWriteExec(final TaskAttemptContext context, final AbstractStorageManager sm,
+  public RangeShuffleFileWriteExec(final TaskAttemptContext context, final StorageManager sm,
                                    final PhysicalExec child, final Schema inSchema, final Schema outSchema,
                                    final SortSpec[] sortSpecs) throws IOException {
     super(context, inSchema, outSchema, child);
@@ -78,7 +78,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
         context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW);
     FileSystem fs = new RawLocalFileSystem();
     fs.mkdirs(storeTablePath);
-    this.appender = (FileAppender) StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
+    this.appender = (FileAppender) StorageManager.getStorageManager(context.getConf()).getAppender(meta,
         outSchema, new Path(storeTablePath, "output"));
     this.appender.enableStats();
     this.appender.init();

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
index 2f0c12f..c7f8e2d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java
@@ -67,7 +67,7 @@ public class SeqScanExec extends PhysicalExec {
 
   private boolean cacheRead = false;
 
-  public SeqScanExec(TaskAttemptContext context, AbstractStorageManager sm, ScanNode plan,
+  public SeqScanExec(TaskAttemptContext context, StorageManager sm, ScanNode plan,
                      CatalogProtos.FragmentProto [] fragments) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema());
 
@@ -218,7 +218,7 @@ public class SeqScanExec extends PhysicalExec {
                 fragments), projected
         );
       } else {
-        this.scanner = StorageManagerFactory.getStorageManager(
+        this.scanner = StorageManager.getStorageManager(
             context.getConf()).getScanner(plan.getTableDesc().getMeta(), plan.getPhysicalSchema(), fragments[0],
             projected);
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index fd0c04f..b496e42 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -29,7 +29,7 @@ import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
 import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -89,10 +89,10 @@ public class StoreTableExec extends UnaryPhysicalExec {
 
     if (plan instanceof InsertNode) {
       InsertNode createTableNode = (InsertNode) plan;
-      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
+      appender = StorageManager.getStorageManager(context.getConf()).getAppender(meta,
           createTableNode.getTableSchema(), context.getOutputPath());
     } else {
-      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta, outSchema, lastFileName);
+      appender = StorageManager.getStorageManager(context.getConf()).getAppender(meta, outSchema, lastFileName);
     }
 
     appender.enableStats();

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
index 342f8d7..382b789 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java
@@ -79,7 +79,7 @@ public class GlobalEngine extends AbstractService {
   private final static Log LOG = LogFactory.getLog(GlobalEngine.class);
 
   private final MasterContext context;
-  private final AbstractStorageManager sm;
+  private final StorageManager sm;
 
   private SQLAnalyzer analyzer;
   private CatalogService catalog;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
index f8c51fd..c0bd842 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/NonForwardQueryResultScanner.java
@@ -33,7 +33,7 @@ import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.session.Session;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
-import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
 
@@ -88,7 +88,7 @@ public class NonForwardQueryResultScanner {
       try {
         // scanNode must be clone cause SeqScanExec change target in the case of a partitioned table.
         scanExec = new SeqScanExec(taskContext,
-            StorageManagerFactory.getStorageManager(tajoConf), (ScanNode)scanNode.clone(), fragments);
+            StorageManager.getStorageManager(tajoConf), (ScanNode)scanNode.clone(), fragments);
       } catch (CloneNotSupportedException e) {
         throw new IOException(e.getMessage(), e);
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
index f267921..8d4a41f 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java
@@ -49,8 +49,7 @@ import org.apache.tajo.master.rm.TajoWorkerResourceManager;
 import org.apache.tajo.master.rm.WorkerResourceManager;
 import org.apache.tajo.master.session.SessionManager;
 import org.apache.tajo.rpc.RpcChannelFactory;
-import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.*;
 import org.apache.tajo.util.metrics.TajoSystemMetrics;
 import org.apache.tajo.webapp.QueryExecutorServlet;
@@ -105,7 +104,7 @@ public class TajoMaster extends CompositeService {
 
   private CatalogServer catalogServer;
   private CatalogService catalog;
-  private AbstractStorageManager storeManager;
+  private StorageManager storeManager;
   private GlobalEngine globalEngine;
   private AsyncDispatcher dispatcher;
   private TajoMasterClientService tajoMasterClientService;
@@ -160,7 +159,7 @@ public class TajoMaster extends CompositeService {
 
       // check the system directory and create if they are not created.
       checkAndInitializeSystemDirectories();
-      this.storeManager = StorageManagerFactory.getStorageManager(systemConf);
+      this.storeManager = StorageManager.getStorageManager(systemConf);
 
       catalogServer = new CatalogServer(FunctionLoader.load());
       addIfService(catalogServer);
@@ -395,7 +394,7 @@ public class TajoMaster extends CompositeService {
     return this.catalogServer;
   }
 
-  public AbstractStorageManager getStorageManager() {
+  public StorageManager getStorageManager() {
     return this.storeManager;
   }
 
@@ -438,7 +437,7 @@ public class TajoMaster extends CompositeService {
       return globalEngine;
     }
 
-    public AbstractStorageManager getStorageManager() {
+    public StorageManager getStorageManager() {
       return storeManager;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
index d3f5b1d..b8240b8 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java
@@ -48,7 +48,7 @@ import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.event.*;
-import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.StorageUtil;
 import org.apache.tajo.util.TUtil;
@@ -70,7 +70,7 @@ public class Query implements EventHandler<QueryEvent> {
   private Map<ExecutionBlockId, SubQuery> subqueries;
   private final EventHandler eventHandler;
   private final MasterPlan plan;
-  private final AbstractStorageManager sm;
+  private final StorageManager sm;
   QueryMasterTask.QueryMasterTaskContext context;
   private ExecutionBlockCursor cursor;
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
index b8c39e0..7c3d799 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java
@@ -41,8 +41,7 @@ import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.NullCallback;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.NetUtils;
 import org.apache.tajo.worker.TajoWorker;
@@ -71,7 +70,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
 
   private GlobalPlanner globalPlanner;
 
-  private AbstractStorageManager storageManager;
+  private StorageManager storageManager;
 
   private TajoConf systemConf;
 
@@ -116,7 +115,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis());
       addIfService(dispatcher);
 
-      this.storageManager = StorageManagerFactory.getStorageManager(systemConf);
+      this.storageManager = StorageManager.getStorageManager(systemConf);
 
       globalPlanner = new GlobalPlanner(systemConf, workerContext);
 
@@ -373,7 +372,7 @@ public class QueryMaster extends CompositeService implements EventHandler {
       return clock;
     }
 
-    public AbstractStorageManager getStorageManager() {
+    public StorageManager getStorageManager() {
       return storageManager;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
index 1ffaf56..cb06df9 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java
@@ -57,7 +57,7 @@ import org.apache.tajo.master.session.Session;
 import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
-import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.util.HAServiceUtil;
 import org.apache.tajo.util.metrics.TajoMetrics;
 import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter;
@@ -556,7 +556,7 @@ public class QueryMasterTask extends CompositeService {
       return queryId;
     }
 
-    public AbstractStorageManager getStorageManager() {
+    public StorageManager getStorageManager() {
       return queryMasterContext.getStorageManager();
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index a061611..615ebcf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -47,7 +47,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAg
 import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty;
 import org.apache.tajo.master.TaskSchedulerContext;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
-import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.RowStoreUtil;
 import org.apache.tajo.storage.TupleRange;
 import org.apache.tajo.storage.fragment.FileFragment;
@@ -81,7 +81,7 @@ public class Repartitioner {
     MasterPlan masterPlan = subQuery.getMasterPlan();
     ExecutionBlock execBlock = subQuery.getBlock();
     QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext();
-    AbstractStorageManager storageManager = subQuery.getStorageManager();
+    StorageManager storageManager = subQuery.getStorageManager();
 
     ScanNode[] scans = execBlock.getScanNodes();
 
@@ -478,7 +478,7 @@ public class Repartitioner {
   /**
    * It creates a number of fragments for all partitions.
    */
-  public static List<FileFragment> getFragmentsFromPartitionedTable(AbstractStorageManager sm,
+  public static List<FileFragment> getFragmentsFromPartitionedTable(StorageManager sm,
                                                                           ScanNode scan,
                                                                           TableDesc table) throws IOException {
     List<FileFragment> fragments = Lists.newArrayList();

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
index 907fc65..a126144 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java
@@ -61,7 +61,7 @@ import org.apache.tajo.master.TaskRunnerGroupEvent.EventType;
 import org.apache.tajo.master.event.*;
 import org.apache.tajo.master.event.QueryUnitAttemptScheduleEvent.QueryUnitAttemptScheduleContext;
 import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry;
-import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.KeyValueSet;
@@ -94,7 +94,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private TableStats resultStatistics;
   private TableStats inputStatistics;
   private EventHandler<Event> eventHandler;
-  private final AbstractStorageManager sm;
+  private final StorageManager sm;
   private AbstractTaskScheduler taskScheduler;
   private QueryMasterTask.QueryMasterTaskContext context;
   private final List<String> diagnostics = new ArrayList<String>();
@@ -282,7 +282,8 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
   private List<IntermediateEntry> hashShuffleIntermediateEntries = new ArrayList<IntermediateEntry>();
   private AtomicInteger completeReportReceived = new AtomicInteger(0);
 
-  public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block, AbstractStorageManager sm) {
+  public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan,
+                  ExecutionBlock block, StorageManager sm) {
     this.context = context;
     this.masterPlan = masterPlan;
     this.block = block;
@@ -434,7 +435,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> {
     return this.priority;
   }
 
-  public AbstractStorageManager getStorageManager() {
+  public StorageManager getStorageManager() {
     return sm;
   }
   

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
index 1731854..5cf6c46 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoQueryEngine.java
@@ -24,18 +24,17 @@ import org.apache.tajo.engine.planner.PhysicalPlannerImpl;
 import org.apache.tajo.engine.planner.logical.LogicalNode;
 import org.apache.tajo.engine.planner.physical.PhysicalExec;
 import org.apache.tajo.exception.InternalException;
-import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.StorageManager;
 
 import java.io.IOException;
 
 public class TajoQueryEngine {
 
-  private final AbstractStorageManager storageManager;
+  private final StorageManager storageManager;
   private final PhysicalPlanner phyPlanner;
 
   public TajoQueryEngine(TajoConf conf) throws IOException {
-    this.storageManager = StorageManagerFactory.getStorageManager(conf);
+    this.storageManager = StorageManager.getStorageManager(conf);
     this.phyPlanner = new PhysicalPlannerImpl(conf, storageManager);
   }
   

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
index 6a90f74..f12e83c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java
@@ -32,9 +32,9 @@ import org.apache.tajo.rpc.CallFuture;
 import org.apache.tajo.rpc.NettyClientBase;
 import org.apache.tajo.rpc.RpcConnectionPool;
 import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos;
-import org.apache.tajo.storage.v2.DiskDeviceInfo;
-import org.apache.tajo.storage.v2.DiskMountInfo;
-import org.apache.tajo.storage.v2.DiskUtil;
+import org.apache.tajo.storage.DiskDeviceInfo;
+import org.apache.tajo.storage.DiskMountInfo;
+import org.apache.tajo.storage.DiskUtil;
 import org.apache.tajo.util.HAServiceUtil;
 
 import java.io.File;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
index 0064e41..45d3c51 100644
--- a/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
+++ b/tajo-core/src/test/java/org/apache/tajo/BackendTestingUtil.java
@@ -47,7 +47,7 @@ public class BackendTestingUtil {
 
   public static void writeTmpTable(TajoConf conf, Path tablePath)
       throws IOException {
-    AbstractStorageManager sm = StorageManagerFactory.getStorageManager(conf, tablePath);
+    StorageManager sm = StorageManager.getStorageManager(conf, tablePath);
     FileSystem fs = sm.getFileSystem();
 
     Appender appender;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
index f1122ce..8810692 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/global/TestBroadcastJoinPlan.java
@@ -44,7 +44,7 @@ import org.apache.tajo.engine.planner.logical.*;
 import org.apache.tajo.engine.query.QueryContext;
 import org.apache.tajo.master.TajoMaster;
 import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageManagerFactory;
+import org.apache.tajo.storage.StorageManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.CommonTestingUtil;
@@ -141,7 +141,7 @@ public class TestBroadcastJoinPlan {
         contentsData += j;
       }
     }
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(tableMeta, schema,
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(tableMeta, schema,
         dataPath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
index 3fecabd..197ff65 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBNLJoinExec.java
@@ -59,7 +59,7 @@ public class TestBNLJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
 
   private static int OUTER_TUPLE_NUM = 1000;
@@ -76,7 +76,7 @@ public class TestBNLJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", Type.INT4);
@@ -86,7 +86,7 @@ public class TestBNLJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
     for (int i = 0; i < OUTER_TUPLE_NUM; i++) {
@@ -107,7 +107,7 @@ public class TestBNLJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < INNER_TUPLE_NUM; i += 2) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
index f817776..00a4416 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestBSTIndexExec.java
@@ -66,7 +66,7 @@ public class TestBSTIndexExec {
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Schema idxSchema;
   private TupleComparator comp;
   private BSTIndex.BSTIndexWriter writer;
@@ -91,7 +91,7 @@ public class TestBSTIndexExec {
     Path workDir = CommonTestingUtil.getTestDir();
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, workDir.toUri().toString());
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
-    sm = StorageManagerFactory.getStorageManager(conf, workDir);
+    sm = StorageManager.getStorageManager(conf, workDir);
 
     idxPath = new Path(workDir, "test.idx");
 
@@ -117,7 +117,7 @@ public class TestBSTIndexExec {
     fs = tablePath.getFileSystem(conf);
     fs.mkdirs(tablePath.getParent());
 
-    FileAppender appender = (FileAppender)StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
+    FileAppender appender = (FileAppender)StorageManager.getStorageManager(conf).getAppender(meta, schema,
         tablePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
@@ -189,7 +189,7 @@ public class TestBSTIndexExec {
   }
 
   private class TmpPlanner extends PhysicalPlannerImpl {
-    public TmpPlanner(TajoConf conf, AbstractStorageManager sm) {
+    public TmpPlanner(TajoConf conf, StorageManager sm) {
       super(conf, sm);
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
index e7aac3c..6572506 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestExternalSortExec.java
@@ -56,7 +56,7 @@ public class TestExternalSortExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
 
   private final int numTuple = 100000;
@@ -73,7 +73,7 @@ public class TestExternalSortExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", Type.INT4);
@@ -82,7 +82,7 @@ public class TestExternalSortExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
     appender.init();
     Tuple tuple = new VTuple(schema.size());

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
index 548d43a..aba5169 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterHashJoinExec.java
@@ -58,7 +58,7 @@ public class TestFullOuterHashJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
   private QueryContext defaultContext;
 
@@ -81,7 +81,7 @@ public class TestFullOuterHashJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -104,7 +104,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -133,7 +133,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -172,7 +172,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -224,7 +224,7 @@ public class TestFullOuterHashJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+    Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
         phone3Path);
     appender5.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
index 1b9f7aa..7998c61 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestFullOuterMergeJoinExec.java
@@ -59,7 +59,7 @@ public class TestFullOuterMergeJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
   private QueryContext defaultContext;
 
@@ -85,7 +85,7 @@ public class TestFullOuterMergeJoinExec {
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
 
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -108,7 +108,7 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -146,7 +146,7 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep4Path = new Path(testDir, "dep4.csv");
-    Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path);
+    Appender appender4 = StorageManager.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path);
     appender4.init();
     Tuple tuple4 = new VTuple(dep4Schema.size());
     for (int i = 0; i < 11; i++) {
@@ -177,7 +177,7 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -216,7 +216,7 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -268,7 +268,7 @@ public class TestFullOuterMergeJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+    Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
         phone3Path);
     appender5.init();
     appender5.flush();

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
index 7055e44..77f524d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashAntiJoinExec.java
@@ -57,7 +57,7 @@ public class TestHashAntiJoinExec {
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
 
   private TableDesc employee;
@@ -72,7 +72,7 @@ public class TestHashAntiJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerid", Type.INT4);
@@ -82,7 +82,7 @@ public class TestHashAntiJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
         employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeSchema.size());
@@ -108,7 +108,7 @@ public class TestHashAntiJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < 10; i += 2) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
index a8828ab..eff647d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashJoinExec.java
@@ -59,7 +59,7 @@ public class TestHashJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
   private QueryContext defaultContext;
 
@@ -75,7 +75,7 @@ public class TestHashJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerid", Type.INT4);
@@ -85,7 +85,7 @@ public class TestHashJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
         employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeSchema.size());
@@ -108,7 +108,7 @@ public class TestHashJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < 10; i += 2) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
index 6373f23..148592d 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestHashSemiJoinExec.java
@@ -57,7 +57,7 @@ public class TestHashSemiJoinExec {
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
   private LogicalOptimizer optimizer;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
 
   private TableDesc employee;
@@ -72,7 +72,7 @@ public class TestHashSemiJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerid", Type.INT4);
@@ -82,7 +82,7 @@ public class TestHashSemiJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
         employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeSchema.size());
@@ -108,7 +108,7 @@ public class TestHashSemiJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     // make 27 tuples

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
index ef740e3..3e40485 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterHashJoinExec.java
@@ -59,7 +59,7 @@ public class TestLeftOuterHashJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
   private QueryContext defaultContext;
 
@@ -82,7 +82,7 @@ public class TestLeftOuterHashJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -105,7 +105,7 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -134,7 +134,7 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -173,7 +173,7 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -225,7 +225,7 @@ public class TestLeftOuterHashJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+    Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
         phone3Path);
     appender5.init();
     

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
index 2a8d9f0..9ee51c3 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestLeftOuterNLJoinExec.java
@@ -60,7 +60,7 @@ public class TestLeftOuterNLJoinExec {
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
   private QueryContext defaultContext;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
 
   private TableDesc dep3;
@@ -81,7 +81,7 @@ public class TestLeftOuterNLJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -104,7 +104,7 @@ public class TestLeftOuterNLJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -133,7 +133,7 @@ public class TestLeftOuterNLJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -172,7 +172,7 @@ public class TestLeftOuterNLJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -224,7 +224,7 @@ public class TestLeftOuterNLJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+    Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
         phone3Path);
     appender5.init();
     

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
index ed19e4f..ddad0a2 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestMergeJoinExec.java
@@ -60,7 +60,7 @@ public class TestMergeJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
 
   private TableDesc employee;
   private TableDesc people;
@@ -75,7 +75,7 @@ public class TestMergeJoinExec {
     catalog.createDatabase(TajoConstants.DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
     FileSystem fs = testDir.getFileSystem(conf);
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     Schema employeeSchema = new Schema();
     employeeSchema.addColumn("managerid", Type.INT4);
@@ -85,7 +85,7 @@ public class TestMergeJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
         employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeSchema.size());
@@ -114,7 +114,7 @@ public class TestMergeJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < 10; i += 2) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
index 1120bbe..20053cb 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestNLJoinExec.java
@@ -60,7 +60,7 @@ public class TestNLJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
 
   private TableDesc employee;
@@ -76,7 +76,7 @@ public class TestNLJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", Type.INT4);
@@ -86,7 +86,7 @@ public class TestNLJoinExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
     appender.init();
     Tuple tuple = new VTuple(schema.size());
     for (int i = 0; i < 50; i++) {
@@ -109,7 +109,7 @@ public class TestNLJoinExec {
     peopleSchema.addColumn("age", Type.INT4);
     TableMeta peopleMeta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path peoplePath = new Path(testDir, "people.csv");
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
+    appender = StorageManager.getStorageManager(conf).getAppender(peopleMeta, peopleSchema, peoplePath);
     appender.init();
     tuple = new VTuple(peopleSchema.size());
     for (int i = 1; i < 50; i += 2) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index 8262d8f..30c2cd5 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -84,7 +84,7 @@ public class TestPhysicalPlanner {
   private static SQLAnalyzer analyzer;
   private static LogicalPlanner planner;
   private static LogicalOptimizer optimizer;
-  private static AbstractStorageManager sm;
+  private static StorageManager sm;
   private static Path testDir;
   private static Session session = LocalTajoTestingUtility.createDummySession();
   private static QueryContext defaultContext;
@@ -102,7 +102,7 @@ public class TestPhysicalPlanner {
     util.startCatalogCluster();
     conf = util.getConfiguration();
     testDir = CommonTestingUtil.getTestDir("target/test-data/TestPhysicalPlanner");
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
     catalog = util.getMiniCatalogCluster().getCatalog();
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
@@ -125,7 +125,7 @@ public class TestPhysicalPlanner {
 
 
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, employeeSchema,
         employeePath);
     appender.init();
     Tuple tuple = new VTuple(employeeSchema.size());
@@ -144,7 +144,7 @@ public class TestPhysicalPlanner {
 
     Path scorePath = new Path(testDir, "score");
     TableMeta scoreMeta = CatalogUtil.newTableMeta(StoreType.CSV, new KeyValueSet());
-    appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreMeta, scoreSchema, scorePath);
+    appender = StorageManager.getStorageManager(conf).getAppender(scoreMeta, scoreSchema, scorePath);
     appender.init();
     score = new TableDesc(
         CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score"), scoreSchema, scoreMeta,
@@ -185,7 +185,7 @@ public class TestPhysicalPlanner {
 
     Schema scoreSchmea = score.getSchema();
     TableMeta scoreLargeMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreLargeMeta, scoreSchmea,
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(scoreLargeMeta, scoreSchmea,
         scoreLargePath);
     appender.enableStats();
     appender.init();
@@ -448,7 +448,7 @@ public class TestPhysicalPlanner {
     exec.next();
     exec.close();
 
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
+    Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
         ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
@@ -508,7 +508,7 @@ public class TestPhysicalPlanner {
     // checking the file contents
     long totalNum = 0;
     for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) {
-      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(
+      Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(
           CatalogUtil.newTableMeta(StoreType.CSV),
           rootNode.getOutSchema(),
           status.getPath());
@@ -545,7 +545,7 @@ public class TestPhysicalPlanner {
     exec.next();
     exec.close();
 
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
+    Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(outputMeta, rootNode.getOutSchema(),
         ctx.getOutputPath());
     scanner.init();
     Tuple tuple;
@@ -1080,7 +1080,7 @@ public class TestPhysicalPlanner {
     Path outputPath = StorageUtil.concatPath(workDir, "output", "output");
     TableMeta meta = CatalogUtil.newTableMeta(channel.getStoreType(), new KeyValueSet());
     SeekableScanner scanner =
-        StorageManagerFactory.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);
+        StorageManager.getSeekableScanner(conf, meta, exec.getSchema(), outputPath);
     scanner.init();
 
     int cnt = 0;

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
index f649dac..c8b9e4a 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestProgressExternalSortExec.java
@@ -62,7 +62,7 @@ public class TestProgressExternalSortExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
 
   private final int numTuple = 100000;
@@ -80,7 +80,7 @@ public class TestProgressExternalSortExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf.setVar(TajoConf.ConfVars.WORKER_TEMPORAL_DIR, testDir.toString());
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     Schema schema = new Schema();
     schema.addColumn("managerid", TajoDataTypes.Type.INT4);
@@ -89,7 +89,7 @@ public class TestProgressExternalSortExec {
 
     TableMeta employeeMeta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.RAW);
     Path employeePath = new Path(testDir, "employee.csv");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(employeeMeta, schema, employeePath);
     appender.enableStats();
     appender.init();
     Tuple tuple = new VTuple(schema.size());

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
index 506555e..09f5ab8 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterHashJoinExec.java
@@ -59,7 +59,7 @@ public class TestRightOuterHashJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
   private QueryContext defaultContext;
 
@@ -80,7 +80,7 @@ public class TestRightOuterHashJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -103,7 +103,7 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -132,7 +132,7 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -171,7 +171,7 @@ public class TestRightOuterHashJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b636e8b8/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
index 0ce74eb..f8e2f41 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestRightOuterMergeJoinExec.java
@@ -58,7 +58,7 @@ public class TestRightOuterMergeJoinExec {
   private CatalogService catalog;
   private SQLAnalyzer analyzer;
   private LogicalPlanner planner;
-  private AbstractStorageManager sm;
+  private StorageManager sm;
   private Path testDir;
   private QueryContext defaultContext;
 
@@ -84,7 +84,7 @@ public class TestRightOuterMergeJoinExec {
     catalog.createTablespace(DEFAULT_TABLESPACE_NAME, testDir.toUri().toString());
     catalog.createDatabase(DEFAULT_DATABASE_NAME, DEFAULT_TABLESPACE_NAME);
     conf = util.getConfiguration();
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 
     //----------------- dep3 ------------------------------
     // dep_id | dep_name  | loc_id
@@ -107,7 +107,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta dep3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep3Path = new Path(testDir, "dep3.csv");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
+    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(dep3Meta, dep3Schema, dep3Path);
     appender1.init();
     Tuple tuple = new VTuple(dep3Schema.size());
     for (int i = 0; i < 10; i++) {
@@ -145,7 +145,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta dep4Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path dep4Path = new Path(testDir, "dep4.csv");
-    Appender appender4 = StorageManagerFactory.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path);
+    Appender appender4 = StorageManager.getStorageManager(conf).getAppender(dep4Meta, dep4Schema, dep4Path);
     appender4.init();
     Tuple tuple4 = new VTuple(dep4Schema.size());
     for (int i = 0; i < 11; i++) {
@@ -176,7 +176,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta job3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path job3Path = new Path(testDir, "job3.csv");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
+    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(job3Meta, job3Schema, job3Path);
     appender2.init();
     Tuple tuple2 = new VTuple(job3Schema.size());
     for (int i = 1; i < 4; i++) {
@@ -215,7 +215,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta emp3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path emp3Path = new Path(testDir, "emp3.csv");
-    Appender appender3 = StorageManagerFactory.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
+    Appender appender3 = StorageManager.getStorageManager(conf).getAppender(emp3Meta, emp3Schema, emp3Path);
     appender3.init();
     Tuple tuple3 = new VTuple(emp3Schema.size());
 
@@ -267,7 +267,7 @@ public class TestRightOuterMergeJoinExec {
 
     TableMeta phone3Meta = CatalogUtil.newTableMeta(StoreType.CSV);
     Path phone3Path = new Path(testDir, "phone3.csv");
-    Appender appender5 = StorageManagerFactory.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
+    Appender appender5 = StorageManager.getStorageManager(conf).getAppender(phone3Meta, phone3Schema,
         phone3Path);
     appender5.init();
 


Mime
View raw message