tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hj...@apache.org
Subject [1/5] TAJO-1121: Remove the 'v2' storage package.
Date Wed, 22 Oct 2014 14:49:45 GMT
Repository: tajo
Updated Branches:
  refs/heads/hbase_storage [created] 0162270ba


http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java
deleted file mode 100644
index 12b984e..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/ScheduledInputStream.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Seekable;
-
-import java.io.*;
-import java.util.LinkedList;
-import java.util.Queue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ScheduledInputStream extends InputStream implements Seekable, Closeable, DataInput {
-  private static final Log LOG = LogFactory.getLog(ScheduledInputStream.class);
-
-	private FSDataInputStream originStream;
-
-  private int currentScanIndex;
-
-  private Queue<ScanData> dataQueue = new LinkedList<ScanData>();
-
-  private ScanData currentScanData;
-
-  private AtomicBoolean closed = new AtomicBoolean(false);
-
-  private boolean eof = false;
-
-  private long pos;
-
-  private AtomicInteger avaliableSize = new AtomicInteger(0);
-
-  private long fileLen;
-
-  private long startOffset;
-
-  private long length;
-
-  private long endOffset;
-
-  private boolean endOfStream = false;
-
-  private Path file;
-
-  private byte readLongBuffer[] = new byte[8];
-
-  private AtomicLong totalReadBytesForFetch = new AtomicLong(0);
-
-  private AtomicLong totalReadBytesFromDisk = new AtomicLong(0);
-
-	public ScheduledInputStream(Path file, FSDataInputStream originStream,
-                              long startOffset, long length, long fileLen) throws IOException {
-		this.originStream = originStream;
-		this.startOffset = startOffset;
-		this.length = length;
-		this.endOffset = startOffset + length;
-		this.fileLen = fileLen;
-    this.file = file;
-		this.pos = this.originStream.getPos();
-
-    LOG.info("Open:" + toString());
-	}
-
-	public int getAvaliableSize() {
-		return avaliableSize.get();
-	}
-
-  public String toString() {
-    return file.getName() + ":" + startOffset + ":" + length;
-  }
-	public boolean readNext(int length) throws IOException {
-		return readNext(length, false);
-	}
-	
-	public boolean readNext(int length, boolean ignoreEOS) throws IOException {
-    synchronized(dataQueue) {
-      if(closed.get() || (!ignoreEOS && endOfStream)) {
-        return false;
-      }
-      int bufLength = ignoreEOS ? length : (int)Math.min(length,  endOffset - originStream.getPos());
-      bufLength = (int)Math.min(bufLength, fileLen - originStream.getPos());
-      if(bufLength == 0) {
-        return false;
-      }
-			byte[] buf = new byte[bufLength];
-
-      try {
-        originStream.readFully(buf);
-      } catch (EOFException e) {
-        LOG.error(e.getMessage(), e);
-        throw e;
-      } catch (Exception e) {
-        throw new IOException(e.getMessage(), e);
-      }
-
-      if(originStream.getPos() == fileLen) {
-        LOG.info("EOF:" + toString());
-        eof = true;
-      }
-      if(!ignoreEOS && originStream.getPos() >= endOffset) {
-        LOG.info("EndOfStream:" + toString());
-        endOfStream = true;
-      }
-
-      if(currentScanData == null) {
-        currentScanData = new ScanData(buf, bufLength);
-        currentScanIndex = 0;
-      } else {
-        dataQueue.offer(new ScanData(buf, bufLength));
-      }
-
-      avaliableSize.addAndGet(bufLength);
-
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Add DataQueue: queue=" + dataQueue.size() +
-          ", avaliable Size=" + avaliableSize.get() + ", pos=" + getPos() +
-          ", originPos=" + originStream.getPos() + ",endOfStream=" + endOfStream +
-          ", bufLength=" + bufLength + ",ignoreEOS=" + ignoreEOS);
-      }
-
-      totalReadBytesFromDisk.addAndGet(bufLength);
-      dataQueue.notifyAll();
-    }
-    return !eof;
-	}
-	
-	static class ScanData {
-		byte[] data;
-		int length;
-		public ScanData(byte[] buf, int length) {
-			this.data = buf;
-			this.length = length;
-		}
-		
-		@Override
-		public String toString() {
-			return "length=" + length;
-		}
-	}
-
-	@Override
-	public void seek(long pos) throws IOException {
-		synchronized(dataQueue) {
-			dataQueue.clear();
-			currentScanData = null;
-			currentScanIndex = 0;
-			avaliableSize.set(0);
-      originStream.seek(pos);
-      this.pos = pos;
-    }
-	}
-
-	@Override
-	public long getPos() throws IOException {
-		return this.pos;
-	}
-
-	public long getOriginStreamPos() {
-		try {
-			return this.originStream.getPos();
-		} catch (IOException e) {
-			e.printStackTrace();
-			return 0;
-		}
-	}
-	
-	@Override
-	public boolean seekToNewSource(long targetPos) throws IOException {
-		synchronized(dataQueue) {
-			dataQueue.clear();
-			currentScanData = null;
-			currentScanIndex = 0;
-			avaliableSize.set(0);
-      boolean result = originStream.seekToNewSource(targetPos);
-
-      this.pos = originStream.getPos();
-      return result;
-		}
-	}
-
-	@Override
-	public int read() throws IOException {
-		if(noMoreData()) {
-			return -1;
-		}
-		if(currentScanData == null || currentScanIndex >= currentScanData.length) {
-			synchronized(dataQueue) {
-				if(dataQueue.isEmpty()) {
-					if(endOfStream) {
-						readNext(64 * 1024, true);
-					} else {
-						try {
-							dataQueue.wait();
-							if(eof && dataQueue.isEmpty() && currentScanIndex > 0) {
-								//no more data
-								return -1;
-							}
-						} catch (InterruptedException e) {
-						}
-					}
-				}
-				if(!dataQueue.isEmpty() && currentScanIndex > 0) {
-					currentScanData = dataQueue.poll();
-					currentScanIndex = 0;
-				}
-			}
-		} 
-		
-		this.pos++;
-		avaliableSize.decrementAndGet();
-    totalReadBytesForFetch.incrementAndGet();
-
-		return currentScanData.data[currentScanIndex++] & 0xff;
-	}
-	
-	private boolean noMoreData() {
-		return closed.get();
-	}
-	
-	public int read(byte b[], int off, int len) throws IOException {
-		if(noMoreData()) {
-			return -1;
-		}
-		if (b == null) {
-		    throw new NullPointerException();
-		} else if (off < 0 || len < 0 || len > b.length - off) {
-		    throw new IndexOutOfBoundsException();
-		} else if (len == 0) {
-		    return 0;
-		}
-		if(currentScanData == null) {
-			synchronized(dataQueue) {
-				if(dataQueue.isEmpty()) {
-					if(endOfStream) {
-						readNext(64 * 1024, true);
-					} else {
-						try {
-							dataQueue.wait();
-							if(noMoreData()) {
-								return -1;
-							}
-						} catch (InterruptedException e) {
-						}
-					}
-				}
-				if(!dataQueue.isEmpty() && currentScanIndex > 0) {
-					currentScanData = dataQueue.poll();
-					currentScanIndex = 0;
-				}
-			}
-		} 
-		
-		int numRemainBytes = currentScanData.length - currentScanIndex;
-		if(numRemainBytes > len) {
-			System.arraycopy(currentScanData.data, currentScanIndex, b, off, len);
-			currentScanIndex += len;
-			avaliableSize.addAndGet(0 - len);
-			pos += len;
-
-      totalReadBytesForFetch.addAndGet(len);
-			return len;
-		} else {
-			int offset = off;
-			int length = 0;
-			int numCopyBytes = numRemainBytes;
-			while(true) {
-				synchronized(dataQueue) {
-					if(numCopyBytes == 0 && eof && dataQueue.isEmpty()) {
-						return -1;
-					}
-				}
-				System.arraycopy(currentScanData.data, currentScanIndex, b, offset, numCopyBytes);
-				currentScanIndex += numCopyBytes;
-				offset += numCopyBytes;
-				length += numCopyBytes;
-				if(length >= len) {
-					break;
-				}
-				synchronized(dataQueue) {
-					if(dataQueue.isEmpty()) {
-						if(eof) {
-							break;
-						}
-						if(endOfStream) {
-							readNext(64 * 1024, true);
-						} else {
-							try {
-								dataQueue.wait();
-							} catch (InterruptedException e) {
-							}
-						}
-					}
-					if(eof && dataQueue.isEmpty()) {
-						break;
-					}
-					if(!dataQueue.isEmpty() && currentScanIndex > 0) {
-						currentScanData = dataQueue.poll();
-						currentScanIndex = 0;
-					}
-					if(currentScanData == null) {
-						break;
-					}
-				}
-        if(currentScanData.length >= (len - length)) {
-          numCopyBytes = (len - length);
-        } else {
-          numCopyBytes = currentScanData.length;
-        }
-			}  //end of while
-			this.pos += length;
-			avaliableSize.addAndGet(0 - length);
-
-      totalReadBytesForFetch.addAndGet(length);
-			return length;
-		}
-	}
-
-  public long getTotalReadBytesForFetch() {
-    return totalReadBytesForFetch.get();
-  }
-
-  public long getTotalReadBytesFromDisk() {
-    return totalReadBytesFromDisk.get();
-  }
-
-	@Override
-	public void close() throws IOException {
-    LOG.info("Close:" + toString());
-		synchronized(dataQueue) {
-			if(closed.get()) {
-				return;
-			}
-			closed.set(true);
-			originStream.close();
-			dataQueue.clear();
-			currentScanIndex = 0;
-			super.close();
-		}
-	}
-
-	@Override
-	public void readFully(byte[] b) throws IOException {
-		readFully(b, 0, b.length);
-	}
-
-	@Override
-	public void readFully(byte[] b, int off, int len) throws IOException {
-		if (len < 0) {
-		    throw new IndexOutOfBoundsException();
-		}
-		int n = 0;
-		while (n < len) {
-		    int count = read(b, off + n, len - n);
-		    if (count < 0) {
-		    	throw new EOFException();
-		    }
-		    n += count;
-		}
-	}
-
-	@Override
-	public int skipBytes(int bytes) throws IOException {
-		int skipTotal = 0;
-		int currentPos = 0;
-
-		while ((skipTotal<bytes) && ((currentPos = (int)skip(bytes-skipTotal)) > 0)) {
-      skipTotal += currentPos;
-		}
-
-		return skipTotal;
-	}
-
-	@Override
-	public boolean readBoolean() throws IOException {
-		int val = read();
-		if (val < 0) {
-		    throw new EOFException();
-    }
-		return (val != 0);
-	}
-
-	@Override
-	public byte readByte() throws IOException {
-		int val = read();
-		if (val < 0) {
-		    throw new EOFException();
-    }
-		return (byte)(val);
-	}
-
-	@Override
-	public int readUnsignedByte() throws IOException {
-		int val = read();
-		if (val < 0) {
-		    throw new EOFException();
-    }
-		return val;
-	}
-
-	@Override
-	public short readShort() throws IOException {
-    int val1 = read();
-    int val2 = read();
-    if ((val1 | val2) < 0) {
-        throw new EOFException();
-    }
-    return (short)((val1 << 8) + (val2 << 0));
-	}
-
-	@Override
-	public int readUnsignedShort() throws IOException {
-    int val1 = read();
-    int val2 = read();
-    if ((val1 | val2) < 0) {
-        throw new EOFException();
-    }
-    return (val1 << 8) + (val2 << 0);
-	}
-
-	@Override
-	public char readChar() throws IOException {
-    int val1 = read();
-    int val2 = read();
-    if ((val1 | val2) < 0) {
-        throw new EOFException();
-    }
-    return (char)((val1 << 8) + (val2 << 0));
-	}
-
-	@Override
-	public int readInt() throws IOException {
-    int val1 = read();
-    int val2 = read();
-    int val3 = read();
-    int val4 = read();
-    if ((val1 | val2 | val3 | val4) < 0) {
-        throw new EOFException();
-    }
-    return ((val1 << 24) + (val2 << 16) + (val3 << 8) + (val4 << 0));
-	}
-
-	@Override
-	public long readLong() throws IOException {
-    readFully(readLongBuffer, 0, 8);
-    return  (((long) readLongBuffer[0] << 56) +
-            ((long)(readLongBuffer[1] & 255) << 48) +
-		        ((long)(readLongBuffer[2] & 255) << 40) +
-            ((long)(readLongBuffer[3] & 255) << 32) +
-            ((long)(readLongBuffer[4] & 255) << 24) +
-            ((readLongBuffer[5] & 255) << 16) +
-            ((readLongBuffer[6] & 255) <<  8) +
-            ((readLongBuffer[7] & 255) <<  0));
-	}
-
-	@Override
-	public float readFloat() throws IOException {
-		return Float.intBitsToFloat(readInt());
-	}
-
-	@Override
-	public double readDouble() throws IOException {
-		return Double.longBitsToDouble(readLong());
-	}
-
-	@Override
-	public String readLine() throws IOException {
-		throw new IOException("Unsupported operation: readLine");
-	}
-
-	@Override
-	public String readUTF() throws IOException {
-		throw new IOException("Unsupported operation: readUTF");
-	}
-
-	public boolean isEOF() {
-		return eof;
-	}
-
-	public boolean isEndOfStream() {
-		return endOfStream;
-	}
-
-  public void reset() {
-    synchronized(dataQueue) {
-      endOfStream = false;
-      eof = false;
-      closed.set(false);
-      dataQueue.clear();
-      currentScanIndex = 0;
-      currentScanData = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java b/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
deleted file mode 100644
index 2fd4a99..0000000
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.storage.AbstractStorageManager;
-import org.apache.tajo.storage.NullScanner;
-import org.apache.tajo.storage.Scanner;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.fragment.Fragment;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.Queue;
-
-public final class StorageManagerV2 extends AbstractStorageManager {
-  private final Log LOG = LogFactory.getLog(StorageManagerV2.class);
-
-	private Queue<FileScannerV2> scanQueue = new LinkedList<FileScannerV2>();
-	
-	private Object scanQueueLock = new Object();
-	
-	private Object scanDataLock = new Object();
-	
-	private ScanScheduler scanScheduler;
-	
-	private StorgaeManagerContext context;
-
-  public StorageManagerV2(TajoConf conf) throws IOException {
-    super(conf);
-		context = new StorgaeManagerContext();
-		scanScheduler = new ScanScheduler(context);
-		scanScheduler.start();
-    LOG.info("StorageManager v2 started...");
-	}
-
-  @Override
-  public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException {
-    Class<? extends Scanner> scannerClass;
-
-    String handlerName = storeType.name().toLowerCase();
-    String handlerNameKey = handlerName + "_v2";
-
-    scannerClass = SCANNER_HANDLER_CACHE.get(handlerNameKey);
-    if (scannerClass == null) {
-      scannerClass = conf.getClass(String.format("tajo.storage.scanner-handler.v2.%s.class",
-          storeType.name().toLowerCase()), null, Scanner.class);
-      SCANNER_HANDLER_CACHE.put(handlerNameKey, scannerClass);
-    }
-
-    return scannerClass;
-  }
-
-  @Override
-  public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException {
-    if (fragment instanceof FileFragment) {
-      FileFragment fileFragment = (FileFragment)fragment;
-      if (fileFragment.getEndKey() == 0) {
-        Scanner scanner = new NullScanner(conf, schema, meta, fileFragment);
-        scanner.setTarget(target.toArray());
-
-        return scanner;
-      }
-    }
-
-    Scanner scanner;
-
-    Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType());
-    if (scannerClass == null) {
-      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
-    }
-
-    scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment);
-    if (scanner.isProjectable()) {
-      scanner.setTarget(target.toArray());
-    }
-
-    if(scanner instanceof FileScannerV2) {
-      ((FileScannerV2)scanner).setStorageManagerContext(context);
-    }
-    return scanner;
-  }
-
-	public void requestFileScan(FileScannerV2 fileScanner) {
-		synchronized(scanQueueLock) {
-			scanQueue.offer(fileScanner);
-			
-			scanQueueLock.notifyAll();
-		}
-	}
-
-	public StorgaeManagerContext getContext() {
-		return context;
-	}
-
-  public class StorgaeManagerContext {
-		public Object getScanQueueLock() {
-			return scanQueueLock;
-		}
-
-		public Object getScanDataLock() {
-			return scanDataLock;
-		}
-		
-		public Queue<FileScannerV2> getScanQueue() {
-			return scanQueue;
-		}
-
-		public int getMaxReadBytesPerScheduleSlot() {
-			return conf.getIntVar(TajoConf.ConfVars.STORAGE_MANAGER_DISK_SCHEDULER_MAX_READ_BYTES_PER_SLOT);
-		}
-
-    public void requestFileScan(FileScannerV2 fileScanner) {
-      StorageManagerV2.this.requestFileScan(fileScanner);
-    }
-
-    public TajoConf getConf() {
-      return conf;
-    }
-
-    public void incrementReadBytes(int diskId, long[] readBytes) {
-      scanScheduler.incrementReadBytes(diskId, readBytes);
-    }
-  }
-
-	public void stop() {
-		if(scanScheduler != null) {
-			scanScheduler.stopScheduler();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index cae0357..212f374 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -124,7 +124,7 @@ public class TestCompressionStorages {
     meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
 
     Path tablePath = new Path(testDir, "SplitCompression");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
 
@@ -156,7 +156,7 @@ public class TestCompressionStorages {
     tablets[0] = new FileFragment("SplitCompression", tablePath, 0, randomNum);
     tablets[1] = new FileFragment("SplitCompression", tablePath, randomNum, (fileLen - randomNum));
 
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+    Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
     assertTrue(scanner.isSplittable());
     scanner.init();
     int tupleCnt = 0;
@@ -166,7 +166,7 @@ public class TestCompressionStorages {
     }
     scanner.close();
 
-    scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
+    scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
     assertTrue(scanner.isSplittable());
     scanner.init();
     while ((tuple = scanner.next()) != null) {
@@ -191,7 +191,7 @@ public class TestCompressionStorages {
 
     String fileName = "Compression_" + codec.getSimpleName();
     Path tablePath = new Path(testDir, fileName);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.enableStats();
 
     appender.init();
@@ -221,7 +221,7 @@ public class TestCompressionStorages {
     FileFragment[] tablets = new FileFragment[1];
     tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
 
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+    Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
 
     if (StoreType.CSV == storeType) {
       if (SplittableCompressionCodec.class.isAssignableFrom(codec)) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
index 302e0da..a355a94 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestFileSystems.java
@@ -54,7 +54,7 @@ public class TestFileSystems {
 
   private static String TEST_PATH = "target/test-data/TestFileSystem";
   private TajoConf conf = null;
-  private AbstractStorageManager sm = null;
+  private StorageManager sm = null;
   private FileSystem fs = null;
   Path testDir;
 
@@ -66,7 +66,7 @@ public class TestFileSystems {
       fs.initialize(URI.create(fs.getScheme() + ":///"), conf);
     }
     this.fs = fs;
-    sm = StorageManagerFactory.getStorageManager(conf);
+    sm = StorageManager.getStorageManager(conf);
     testDir = getTestDir(this.fs, TEST_PATH);
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
index 8c92eaf..c1a96a5 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java
@@ -49,7 +49,7 @@ import static org.junit.Assert.*;
 @RunWith(Parameterized.class)
 public class TestMergeScanner {
   private TajoConf conf;
-  AbstractStorageManager sm;
+  StorageManager sm;
   private static String TEST_PATH = "target/test-data/TestMergeScanner";
 
   private static String TEST_MULTIPLE_FILES_AVRO_SCHEMA =
@@ -95,7 +95,7 @@ public class TestMergeScanner {
     conf.setStrings("tajo.storage.projectable-scanner", "rcfile", "trevni", "parquet", "avro");
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
   }
 
   @Test
@@ -115,7 +115,7 @@ public class TestMergeScanner {
     }
 
     Path table1Path = new Path(testDir, storeType + "_1.data");
-    Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, table1Path);
+    Appender appender1 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table1Path);
     appender1.enableStats();
     appender1.init();
     int tupleNum = 10000;
@@ -137,7 +137,7 @@ public class TestMergeScanner {
     }
 
     Path table2Path = new Path(testDir, storeType + "_2.data");
-    Appender appender2 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, table2Path);
+    Appender appender2 = StorageManager.getStorageManager(conf).getAppender(meta, schema, table2Path);
     appender2.enableStats();
     appender2.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
index 8d1d0b3..c3d4992 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorageManager.java
@@ -47,7 +47,7 @@ import static org.junit.Assert.*;
 public class TestStorageManager {
 	private TajoConf conf;
 	private static String TEST_PATH = "target/test-data/TestStorageManager";
-	AbstractStorageManager sm = null;
+  StorageManager sm = null;
   private Path testDir;
   private FileSystem fs;
 
@@ -56,7 +56,7 @@ public class TestStorageManager {
 		conf = new TajoConf();
     testDir = CommonTestingUtil.getTestDir(TEST_PATH);
     fs = testDir.getFileSystem(conf);
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
+    sm = StorageManager.getStorageManager(conf, testDir);
 	}
 
 	@After
@@ -83,14 +83,14 @@ public class TestStorageManager {
 
     Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
     fs.mkdirs(path.getParent());
-		Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, path);
+		Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, path);
     appender.init();
 		for(Tuple t : tuples) {
 		  appender.addTuple(t);
 		}
 		appender.close();
 
-		Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(meta, schema, path);
+		Scanner scanner = StorageManager.getStorageManager(conf).getFileScanner(meta, schema, path);
     scanner.init();
 		int i=0;
 		while(scanner.next() != null) {
@@ -124,7 +124,7 @@ public class TestStorageManager {
       }
 
       assertTrue(fs.exists(tablePath));
-      AbstractStorageManager sm = StorageManagerFactory.getStorageManager(new TajoConf(conf), tablePath);
+      StorageManager sm = StorageManager.getStorageManager(new TajoConf(conf), tablePath);
 
       Schema schema = new Schema();
       schema.addColumn("id", Type.INT4);
@@ -176,7 +176,7 @@ public class TestStorageManager {
         DFSTestUtil.createFile(fs, tmpFile, 10, (short) 2, 0xDEADDEADl);
       }
       assertTrue(fs.exists(tablePath));
-      AbstractStorageManager sm = StorageManagerFactory.getStorageManager(new TajoConf(conf), tablePath);
+      StorageManager sm = StorageManager.getStorageManager(new TajoConf(conf), tablePath);
 
       Schema schema = new Schema();
       schema.addColumn("id", Type.INT4);

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
index 5d1b652..ef5388c 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -139,7 +139,7 @@ public class TestStorages {
 
       TableMeta meta = CatalogUtil.newTableMeta(storeType);
       Path tablePath = new Path(testDir, "Splitable.data");
-      Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+      Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
       appender.enableStats();
       appender.init();
       int tupleNum = 10000;
@@ -163,7 +163,7 @@ public class TestStorages {
       tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
       tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
 
-      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+      Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
       assertTrue(scanner.isSplittable());
       scanner.init();
       int tupleCnt = 0;
@@ -172,7 +172,7 @@ public class TestStorages {
       }
       scanner.close();
 
-      scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
+      scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
       assertTrue(scanner.isSplittable());
       scanner.init();
       while (scanner.next() != null) {
@@ -193,7 +193,7 @@ public class TestStorages {
 
       TableMeta meta = CatalogUtil.newTableMeta(storeType);
       Path tablePath = new Path(testDir, "Splitable.data");
-      Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+      Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
       appender.enableStats();
       appender.init();
       int tupleNum = 10000;
@@ -217,7 +217,7 @@ public class TestStorages {
       tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
       tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
 
-      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
+      Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
       assertTrue(scanner.isSplittable());
       scanner.init();
       int tupleCnt = 0;
@@ -226,7 +226,7 @@ public class TestStorages {
       }
       scanner.close();
 
-      scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
+      scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
       assertTrue(scanner.isSplittable());
       scanner.init();
       while (scanner.next() != null) {
@@ -253,7 +253,7 @@ public class TestStorages {
     }
 
     Path tablePath = new Path(testDir, "testProjection.data");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     int tupleNum = 10000;
     VTuple vTuple;
@@ -273,7 +273,7 @@ public class TestStorages {
     Schema target = new Schema();
     target.addColumn("age", Type.INT8);
     target.addColumn("score", Type.FLOAT4);
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment, target);
+    Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment, target);
     scanner.init();
     int tupleCnt = 0;
     Tuple tuple;
@@ -321,7 +321,7 @@ public class TestStorages {
     }
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
 
     QueryId queryid = new QueryId("12345", 5);
@@ -349,7 +349,7 @@ public class TestStorages {
 
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple retrieved;
@@ -391,7 +391,7 @@ public class TestStorages {
     }
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
 
     QueryId queryid = new QueryId("12345", 5);
@@ -432,7 +432,7 @@ public class TestStorages {
 
     FileStatus status = fs.getFileStatus(tablePath);
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple retrieved;
@@ -476,7 +476,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
 
@@ -507,7 +507,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple retrieved;
@@ -545,7 +545,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
 
@@ -576,7 +576,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
     Tuple retrieved;
@@ -614,7 +614,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
 
@@ -645,7 +645,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner =  StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
     assertTrue(scanner instanceof SequenceFileScanner);
@@ -687,7 +687,7 @@ public class TestStorages {
     meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
 
     Path tablePath = new Path(testDir, "testVariousTypes.data");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.enableStats();
     appender.init();
 
@@ -718,7 +718,7 @@ public class TestStorages {
     assertEquals(appender.getStats().getNumBytes().longValue(), status.getLen());
 
     FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+    Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
     scanner.init();
 
     assertTrue(scanner instanceof SequenceFileScanner);
@@ -748,7 +748,7 @@ public class TestStorages {
       TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
 
       Path tablePath = new Path(testDir, "testTime.data");
-      Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+      Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
       appender.init();
 
       Tuple tuple = new VTuple(3);
@@ -763,7 +763,7 @@ public class TestStorages {
 
       FileStatus status = fs.getFileStatus(tablePath);
       FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
+      Scanner scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema, fragment);
       scanner.init();
 
       Tuple retrieved;
@@ -789,7 +789,7 @@ public class TestStorages {
 
     TableMeta meta = CatalogUtil.newTableMeta(storeType);
     Path tablePath = new Path(testDir, "Seekable.data");
-    FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
+    FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema,
 	tablePath);
     appender.enableStats();
     appender.init();
@@ -831,7 +831,7 @@ public class TestStorages {
     long readBytes = 0;
     long readRows = 0;
     for (long offset : offsets) {
-      scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema,
+      scanner = StorageManager.getStorageManager(conf).getScanner(meta, schema,
 	  new FileFragment("table", tablePath, prevOffset, offset - prevOffset), schema);
       scanner.init();
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
index 54798a4..bf13f3c 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestBSTIndex.java
@@ -89,7 +89,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindValue_" + storeType);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i++) {
@@ -124,7 +124,7 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -147,7 +147,7 @@ public class TestBSTIndex {
     tuple = new VTuple(keySchema.size());
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValue_" + storeType + ".idx"), keySchema, comp);
     reader.open();
-    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@ -177,7 +177,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testBuildIndexWithAppender_" + storeType);
-    FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema,
+    FileAppender appender = (FileAppender) StorageManager.getStorageManager(conf).getAppender(meta, schema,
         tablePath);
     appender.init();
 
@@ -226,7 +226,7 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testBuildIndexWithAppender_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     for (int i = 0; i < TUPLE_NUM - 1; i++) {
@@ -256,7 +256,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = StorageUtil.concatPath(testDir, "testFindOmittedValue_" + storeType);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i += 2) {
@@ -289,7 +289,7 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -326,7 +326,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyValue_" + storeType);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i++) {
@@ -360,7 +360,7 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -383,7 +383,7 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyValue_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple result;
@@ -416,7 +416,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyOmittedValue_" + storeType);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i += 2) {
@@ -450,7 +450,7 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -473,7 +473,7 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindNextKeyOmittedValue_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple result;
@@ -495,7 +495,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindMinValue" + storeType);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
 
     Tuple tuple;
@@ -529,7 +529,7 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -554,7 +554,7 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindMinValue_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     tuple.put(0, DatumFactory.createInt8(0));
@@ -578,7 +578,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testMinMax_" + storeType);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 5; i < TUPLE_NUM; i += 2) {
@@ -612,7 +612,7 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -682,7 +682,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testConcurrentAccess_" + storeType);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
 
     Tuple tuple;
@@ -717,7 +717,7 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -762,7 +762,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindValueDescOrder_" + storeType);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
 
     Tuple tuple;
@@ -798,7 +798,7 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -823,7 +823,7 @@ public class TestBSTIndex {
     BSTIndexReader reader = bst.getIndexReader(new Path(testDir, "testFindValueDescOrder_" + storeType + ".idx"),
         keySchema, comp);
     reader.open();
-    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     for (int i = (TUPLE_NUM - 1); i > 0; i--) {
@@ -853,7 +853,7 @@ public class TestBSTIndex {
     meta = CatalogUtil.newTableMeta(storeType);
 
     Path tablePath = new Path(testDir, "testFindNextKeyValueDescOrder_" + storeType);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
 
     Tuple tuple;
@@ -888,7 +888,7 @@ public class TestBSTIndex {
     creater.setLoadNum(LOAD_NUM);
     creater.open();
 
-    SeekableScanner scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    SeekableScanner scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple keyTuple;
@@ -916,7 +916,7 @@ public class TestBSTIndex {
     assertEquals(keySchema, reader.getKeySchema());
     assertEquals(comp, reader.getComparator());
 
-    scanner = StorageManagerFactory.getSeekableScanner(conf, meta, schema, tablet, schema);
+    scanner = StorageManager.getSeekableScanner(conf, meta, schema, tablet, schema);
     scanner.init();
 
     Tuple result;

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
index 53a2531..78b16c3 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/index/TestSingleCSVFileBSTIndex.java
@@ -78,7 +78,7 @@ public class TestSingleCSVFileBSTIndex {
     Path tablePath = StorageUtil.concatPath(testDir, "testFindValueInSingleCSV", "table.csv");
     fs.mkdirs(tablePath.getParent());
 
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for (int i = 0; i < TUPLE_NUM; i++) {
@@ -167,7 +167,7 @@ public class TestSingleCSVFileBSTIndex {
     Path tablePath = StorageUtil.concatPath(testDir, "testFindNextKeyValueInSingleCSV",
         "table1.csv");
     fs.mkdirs(tablePath.getParent());
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
+    Appender appender = StorageManager.getStorageManager(conf).getAppender(meta, schema, tablePath);
     appender.init();
     Tuple tuple;
     for(int i = 0 ; i < TUPLE_NUM; i ++ ) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
deleted file mode 100644
index 7e95b8b..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVCompression.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.compress.*;
-import org.apache.hadoop.util.NativeCodeLoader;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.junit.Assert.assertEquals;
-
-@RunWith(Parameterized.class)
-public class TestCSVCompression {
-  private TajoConf conf;
-  private static String TEST_PATH = "target/test-data/v2/TestCSVCompression";
-
-  private CatalogProtos.StoreType storeType;
-  private Path testDir;
-  private FileSystem fs;
-
-  public TestCSVCompression(CatalogProtos.StoreType type) throws IOException {
-    this.storeType = type;
-    conf = new TajoConf();
-    conf.setBoolVar(TajoConf.ConfVars.STORAGE_MANAGER_VERSION_2, true);
-
-    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    fs = testDir.getFileSystem(conf);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][]{
-        {CatalogProtos.StoreType.CSV}
-    });
-  }
-
-  @Test
-  public void testDeflateCodecCompressionData() throws IOException {
-    storageCompressionTest(storeType, DeflateCodec.class);
-  }
-
-  @Test
-  public void testGzipCodecCompressionData() throws IOException {
-    storageCompressionTest(storeType, GzipCodec.class);
-  }
-
-  @Test
-  public void testSnappyCodecCompressionData() throws IOException {
-    if (SnappyCodec.isNativeCodeLoaded()) {
-      storageCompressionTest(storeType, SnappyCodec.class);
-    }
-  }
-
-  @Test
-  public void testBzip2CodecCompressionData() throws IOException {
-    storageCompressionTest(storeType, BZip2Codec.class);
-  }
-
-  @Test
-  public void testLz4CodecCompressionData() throws IOException {
-    if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
-      storageCompressionTest(storeType, Lz4Codec.class);
-  }
-
-  // TODO - See https://issues.apache.org/jira/browse/HADOOP-9622
-  //@Test
-  public void testSplitCompressionData() throws IOException {
-
-    Schema schema = new Schema();
-    schema.addColumn("id", TajoDataTypes.Type.INT4);
-    schema.addColumn("age", TajoDataTypes.Type.INT8);
-
-    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
-    meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
-
-    Path tablePath = new Path(testDir, "SplitCompression");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
-    appender.enableStats();
-
-    appender.init();
-
-    String extention = "";
-    if (appender instanceof CSVFile.CSVAppender) {
-      extention = ((CSVFile.CSVAppender) appender).getExtension();
-    }
-
-    int tupleNum = 100000;
-    VTuple vTuple;
-
-    for (int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(2);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createInt8(25l));
-      appender.addTuple(vTuple);
-    }
-    appender.close();
-
-    TableStats stat = appender.getStats();
-    assertEquals(tupleNum, stat.getNumRows().longValue());
-    tablePath = tablePath.suffix(extention);
-
-    FileStatus status = fs.getFileStatus(tablePath);
-    long fileLen = status.getLen();
-    long randomNum = (long) (Math.random() * fileLen) + 1;
-
-    FileFragment[] tablets = new FileFragment[2];
-    tablets[0] = new FileFragment("SplitCompression", tablePath, 0, randomNum);
-    tablets[1] = new FileFragment("SplitCompression", tablePath, randomNum, (fileLen - randomNum));
-
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
-    scanner.init();
-    int tupleCnt = 0;
-    Tuple tuple;
-    while ((tuple = scanner.next()) != null) {
-      tupleCnt++;
-    }
-    scanner.close();
-
-    scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
-    scanner.init();
-    while ((tuple = scanner.next()) != null) {
-      tupleCnt++;
-    }
-
-    scanner.close();
-    assertEquals(tupleNum, tupleCnt);
-  }
-
-  private void storageCompressionTest(CatalogProtos.StoreType storeType, Class<? extends CompressionCodec> codec)
-      throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("id", TajoDataTypes.Type.INT4);
-    schema.addColumn("age", TajoDataTypes.Type.INT8);
-
-    TableMeta meta = CatalogUtil.newTableMeta(storeType);
-    meta.putOption("compression.codec", codec.getCanonicalName());
-
-    String fileName = "Compression_" + codec.getSimpleName();
-    Path tablePath = new Path(testDir, fileName);
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
-    appender.enableStats();
-
-    appender.init();
-
-    String extension = "";
-    if (appender instanceof CSVFile.CSVAppender) {
-      extension = ((CSVFile.CSVAppender) appender).getExtension();
-    }
-
-    int tupleNum = 10000;
-    VTuple vTuple;
-
-    for (int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(2);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createInt8(25l));
-      appender.addTuple(vTuple);
-    }
-    appender.close();
-
-    TableStats stat = appender.getStats();
-    assertEquals(tupleNum, stat.getNumRows().longValue());
-    tablePath = tablePath.suffix(extension);
-    FileStatus status = fs.getFileStatus(tablePath);
-    long fileLen = status.getLen();
-    FileFragment[] tablets = new FileFragment[1];
-    tablets[0] = new FileFragment(fileName, tablePath, 0, fileLen);
-
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
-    scanner.init();
-    int tupleCnt = 0;
-    while (scanner.next() != null) {
-      tupleCnt++;
-    }
-    scanner.close();
-    assertEquals(tupleCnt, tupleNum);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
deleted file mode 100644
index c356548..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestCSVScanner.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos;
-import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.apache.tajo.conf.TajoConf.ConfVars;
-import static org.junit.Assert.assertEquals;
-
-public class TestCSVScanner {
-  private TajoConf conf;
-  private static String TEST_PATH = "target/test-data/v2/TestCSVScanner";
-  AbstractStorageManager sm = null;
-  private Path testDir;
-  private FileSystem fs;
-
-  @Before
-  public void setUp() throws Exception {
-    conf = new TajoConf();
-    conf.setBoolVar(ConfVars.STORAGE_MANAGER_VERSION_2, true);
-    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    fs = testDir.getFileSystem(conf);
-    sm = StorageManagerFactory.getStorageManager(conf, testDir);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-  }
-
-  @Test
-  public final void testGetScannerAndAppender() throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("id", TajoDataTypes.Type.INT4);
-    schema.addColumn("age", TajoDataTypes.Type.INT4);
-    schema.addColumn("name", TajoDataTypes.Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
-
-    Tuple[] tuples = new Tuple[4];
-    for(int i=0; i < tuples.length; i++) {
-      tuples[i] = new VTuple(3);
-      tuples[i].put(new Datum[] {
-          DatumFactory.createInt4(i),
-          DatumFactory.createInt4(i + 32),
-          DatumFactory.createText("name" + i)});
-    }
-
-    Path path = StorageUtil.concatPath(testDir, "testGetScannerAndAppender", "table.csv");
-    fs.mkdirs(path.getParent());
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, path);
-    appender.init();
-    for(Tuple t : tuples) {
-      appender.addTuple(t);
-    }
-    appender.close();
-
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(meta, schema, path);
-    scanner.init();
-    int i=0;
-    Tuple tuple = null;
-    while( (tuple = scanner.next()) != null) {
-      i++;
-    }
-    assertEquals(4,i);
-  }
-
-  @Test
-  public final void testPartitionFile() throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("key", TajoDataTypes.Type.TEXT);
-    schema.addColumn("age", TajoDataTypes.Type.INT4);
-    schema.addColumn("name", TajoDataTypes.Type.TEXT);
-
-    TableMeta meta = CatalogUtil.newTableMeta(CatalogProtos.StoreType.CSV);
-
-
-    Path path = StorageUtil.concatPath(testDir, "testPartitionFile", "table.csv");
-    fs.mkdirs(path.getParent());
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, path);
-    appender.init();
-
-    String keyValue = "";
-    for(int i = 0; i < 100; i++) {
-      keyValue += "0123456789";
-    }
-    keyValue = "key_" + keyValue + "_";
-
-    String nameValue = "";
-    for(int i = 0; i < 100; i++) {
-      nameValue += "0123456789";
-    }
-    nameValue = "name_" + nameValue + "_";
-
-    int numTuples = 100000;
-    for(int i = 0; i < numTuples; i++) {
-      Tuple tuple = new VTuple(3);
-      tuple.put(new Datum[] {
-          DatumFactory.createText(keyValue + i),
-          DatumFactory.createInt4(i + 32),
-          DatumFactory.createText(nameValue + i)});
-      appender.addTuple(tuple);
-    }
-    appender.close();
-
-    long fileLength = fs.getLength(path);
-    long totalTupleCount = 0;
-
-    int scanCount = 0;
-    Tuple startTuple = null;
-    Tuple lastTuple = null;
-    while(true) {
-      long startOffset = (64 * 1024 * 1024) * scanCount;
-      long length = Math.min(64 * 1024 * 1024, fileLength - startOffset);
-
-      FileFragment fragment = new FileFragment("Test", path, startOffset, length, null);
-      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment, schema);
-      scanner.init();
-      Tuple tuple = null;
-      while( (tuple = scanner.next()) != null) {
-        if(startTuple == null) {
-          startTuple = tuple;
-        }
-        lastTuple = tuple;
-        totalTupleCount++;
-      }
-      scanCount++;
-      if(length < 64 * 1024 * 1024) {
-        break;
-      }
-    }
-    assertEquals(numTuples, totalTupleCount);
-    assertEquals(keyValue + 0, startTuple.get(0).toString());
-    assertEquals(keyValue + (numTuples - 1), lastTuple.get(0).toString());
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
deleted file mode 100644
index 357dadb..0000000
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.v2;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.tajo.catalog.CatalogUtil;
-import org.apache.tajo.util.KeyValueSet;
-import org.apache.tajo.catalog.Schema;
-import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.Type;
-import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.datum.Datum;
-import org.apache.tajo.datum.DatumFactory;
-import org.apache.tajo.datum.NullDatum;
-import org.apache.tajo.storage.*;
-import org.apache.tajo.storage.fragment.FileFragment;
-import org.apache.tajo.storage.rcfile.RCFile;
-import org.apache.tajo.util.CommonTestingUtil;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-
-import static org.apache.tajo.conf.TajoConf.ConfVars;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class TestStorages {
-	private TajoConf conf;
-	private static String TEST_PATH = "target/test-data/v2/TestStorages";
-
-  private static String TEST_PROJECTION_AVRO_SCHEMA =
-      "{\n" +
-      "  \"type\": \"record\",\n" +
-      "  \"namespace\": \"org.apache.tajo\",\n" +
-      "  \"name\": \"testProjection\",\n" +
-      "  \"fields\": [\n" +
-      "    { \"name\": \"id\", \"type\": \"int\" },\n" +
-      "    { \"name\": \"age\", \"type\": \"long\" },\n" +
-      "    { \"name\": \"score\", \"type\": \"float\" }\n" +
-      "  ]\n" +
-      "}\n";
-
-  private static String TEST_VARIOUS_TYPES_AVRO_SCHEMA =
-      "{\n" +
-      "  \"type\": \"record\",\n" +
-      "  \"namespace\": \"org.apache.tajo\",\n" +
-      "  \"name\": \"testVariousTypes\",\n" +
-      "  \"fields\": [\n" +
-      "    { \"name\": \"col1\", \"type\": \"boolean\" },\n" +
-      "    { \"name\": \"col2\", \"type\": \"int\" },\n" +
-      "    { \"name\": \"col3\", \"type\": \"string\" },\n" +
-      "    { \"name\": \"col4\", \"type\": \"int\" },\n" +
-      "    { \"name\": \"col5\", \"type\": \"int\" },\n" +
-      "    { \"name\": \"col6\", \"type\": \"long\" },\n" +
-      "    { \"name\": \"col7\", \"type\": \"float\" },\n" +
-      "    { \"name\": \"col8\", \"type\": \"double\" },\n" +
-      "    { \"name\": \"col9\", \"type\": \"string\" },\n" +
-      "    { \"name\": \"col10\", \"type\": \"bytes\" },\n" +
-      "    { \"name\": \"col11\", \"type\": \"bytes\" },\n" +
-      "    { \"name\": \"col12\", \"type\": \"null\" }\n" +
-      "  ]\n" +
-      "}\n";
-
-  private StoreType storeType;
-  private boolean splitable;
-  private boolean statsable;
-  private Path testDir;
-  private FileSystem fs;
-
-  public TestStorages(StoreType type, boolean splitable, boolean statsable) throws IOException {
-    this.storeType = type;
-    this.splitable = splitable;
-    this.statsable = statsable;
-
-    conf = new TajoConf();
-    conf.setBoolVar(ConfVars.STORAGE_MANAGER_VERSION_2, true);
-
-    if (storeType == StoreType.RCFILE) {
-      conf.setInt(RCFile.RECORD_INTERVAL_CONF_STR, 100);
-    }
-
-
-    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
-    fs = testDir.getFileSystem(conf);
-  }
-
-  @Parameterized.Parameters
-  public static Collection<Object[]> generateParameters() {
-    return Arrays.asList(new Object[][]{
-        {StoreType.CSV, true, true},
-        {StoreType.RCFILE, true, true},
-        {StoreType.TREVNI, false, true},
-        {StoreType.PARQUET, false, false},
-        {StoreType.AVRO, false, false},
-        {StoreType.RAW, false, false},
-    });
-  }
-
-	@Test
-  public void testSplitable() throws IOException {
-    if (splitable) {
-      Schema schema = new Schema();
-      schema.addColumn("id", Type.INT4);
-      schema.addColumn("age", Type.INT8);
-
-      TableMeta meta = CatalogUtil.newTableMeta(storeType);
-      meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
-      Path tablePath = new Path(testDir, "Splitable.data");
-      Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
-      appender.enableStats();
-      appender.init();
-      int tupleNum = 10000;
-      VTuple vTuple;
-
-      for(int i = 0; i < tupleNum; i++) {
-        vTuple = new VTuple(2);
-        vTuple.put(0, DatumFactory.createInt4(i + 1));
-        vTuple.put(1, DatumFactory.createInt8(25l));
-        appender.addTuple(vTuple);
-      }
-      appender.close();
-      TableStats stat = appender.getStats();
-      assertEquals(tupleNum, stat.getNumRows().longValue());
-
-      FileStatus status = fs.getFileStatus(tablePath);
-      long fileLen = status.getLen();
-      long randomNum = (long) (Math.random() * fileLen) + 1;
-
-      FileFragment[] tablets = new FileFragment[2];
-      tablets[0] = new FileFragment("Splitable", tablePath, 0, randomNum);
-      tablets[1] = new FileFragment("Splitable", tablePath, randomNum, (fileLen - randomNum));
-
-      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[0], schema);
-      scanner.init();
-      int tupleCnt = 0;
-      while (scanner.next() != null) {
-        tupleCnt++;
-      }
-      scanner.close();
-
-      scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, tablets[1], schema);
-      scanner.init();
-      while (scanner.next() != null) {
-        tupleCnt++;
-      }
-      scanner.close();
-
-      assertEquals(tupleNum, tupleCnt);
-    }
-	}
-
-  @Test
-  public void testProjection() throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("id", Type.INT4);
-    schema.addColumn("age", Type.INT8);
-    schema.addColumn("score", Type.FLOAT4);
-
-    TableMeta meta = CatalogUtil.newTableMeta(storeType);
-    meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
-    if (storeType == StoreType.AVRO) {
-      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
-                     TEST_PROJECTION_AVRO_SCHEMA);
-    }
-
-    Path tablePath = new Path(testDir, "testProjection.data");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
-    appender.init();
-    int tupleNum = 10000;
-    VTuple vTuple;
-
-    for(int i = 0; i < tupleNum; i++) {
-      vTuple = new VTuple(3);
-      vTuple.put(0, DatumFactory.createInt4(i + 1));
-      vTuple.put(1, DatumFactory.createInt8(i + 2));
-      vTuple.put(2, DatumFactory.createFloat4(i + 3));
-      appender.addTuple(vTuple);
-    }
-    appender.close();
-
-    FileStatus status = fs.getFileStatus(tablePath);
-    FileFragment fragment = new FileFragment("testReadAndWrite", tablePath, 0, status.getLen());
-
-    Schema target = new Schema();
-    target.addColumn("age", Type.INT8);
-    target.addColumn("score", Type.FLOAT4);
-    Scanner scanner = StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment, target);
-    scanner.init();
-    int tupleCnt = 0;
-    Tuple tuple;
-    while ((tuple = scanner.next()) != null) {
-      if (storeType == StoreType.RCFILE
-          || storeType == StoreType.TREVNI
-          || storeType == StoreType.CSV
-          || storeType == StoreType.PARQUET
-          || storeType == StoreType.AVRO) {
-        assertTrue(tuple.get(0) == null || tuple.get(0) instanceof NullDatum);
-      }
-      assertTrue(tupleCnt + 2 == tuple.get(1).asInt8());
-      assertTrue(tupleCnt + 3 == tuple.get(2).asFloat4());
-      tupleCnt++;
-    }
-    scanner.close();
-
-    assertEquals(tupleNum, tupleCnt);
-  }
-
-  @Test
-  public void testVariousTypes() throws IOException {
-    Schema schema = new Schema();
-    schema.addColumn("col1", Type.BOOLEAN);
-    schema.addColumn("col2", Type.BIT);
-    schema.addColumn("col3", Type.CHAR, 7);
-    schema.addColumn("col4", Type.INT2);
-    schema.addColumn("col5", Type.INT4);
-    schema.addColumn("col6", Type.INT8);
-    schema.addColumn("col7", Type.FLOAT4);
-    schema.addColumn("col8", Type.FLOAT8);
-    schema.addColumn("col9", Type.TEXT);
-    schema.addColumn("col10", Type.BLOB);
-    schema.addColumn("col11", Type.INET4);
-    schema.addColumn("col12", Type.NULL_TYPE);
-
-    KeyValueSet options = new KeyValueSet();
-    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
-    meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
-    if (storeType == StoreType.AVRO) {
-      meta.putOption(StorageConstants.AVRO_SCHEMA_LITERAL,
-                     TEST_VARIOUS_TYPES_AVRO_SCHEMA);
-    }
-
-    Path tablePath = new Path(testDir, "testVariousTypes.data");
-    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
-    appender.init();
-
-    Tuple tuple = new VTuple(12);
-    tuple.put(new Datum[] {
-        DatumFactory.createBool(true),
-        DatumFactory.createBit((byte) 0x99),
-        DatumFactory.createChar("hyunsik"),
-        DatumFactory.createInt2((short) 17),
-        DatumFactory.createInt4(59),
-        DatumFactory.createInt8(23l),
-        DatumFactory.createFloat4(77.9f),
-        DatumFactory.createFloat8(271.9f),
-        DatumFactory.createText("hyunsik"),
-        DatumFactory.createBlob("hyunsik".getBytes()),
-        DatumFactory.createInet4("192.168.0.1"),
-        NullDatum.get()
-    });
-    appender.addTuple(tuple);
-    appender.flush();
-    appender.close();
-
-    FileStatus status = fs.getFileStatus(tablePath);
-    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
-    Scanner scanner =  StorageManagerFactory.getStorageManager(conf).getScanner(meta, schema, fragment);
-    scanner.init();
-
-    Tuple retrieved;
-    while ((retrieved=scanner.next()) != null) {
-      for (int i = 0; i < tuple.size(); i++) {
-        assertEquals(tuple.get(i), retrieved.get(i));
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/0162270b/tajo-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml
index a1cc2ff..6bfc902 100644
--- a/tajo-storage/src/test/resources/storage-default.xml
+++ b/tajo-storage/src/test/resources/storage-default.xml
@@ -21,23 +21,6 @@
 
 <configuration>
   <property>
-    <name>tajo.storage.manager.v2</name>
-    <value>false</value>
-  </property>
-
-  <property>
-    <name>tajo.storage.manager.maxReadBytes</name>
-    <value>8388608</value>
-    <description></description>
-  </property>
-
-  <property>
-    <name>tajo.storage.manager.concurrency.perDisk</name>
-    <value>1</value>
-    <description></description>
-  </property>
-
-  <property>
     <name>fs.s3.impl</name>
     <value>org.apache.tajo.storage.s3.SmallBlockS3FileSystem</value>
   </property>
@@ -89,80 +72,40 @@
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.csv.class</name>
-    <value>org.apache.tajo.storage.v2.CSVFileScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.raw.class</name>
     <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.raw.class</name>
-    <value>org.apache.tajo.storage.RawFile$RawFileScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.rcfile.class</name>
     <value>org.apache.tajo.storage.rcfile.RCFile$RCFileScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.rcfile.class</name>
-    <value>org.apache.tajo.storage.v2.RCFileScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.rowfile.class</name>
     <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.rowfile.class</name>
-    <value>org.apache.tajo.storage.RowFile$RowFileScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.trevni.class</name>
     <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.trevni.class</name>
-    <value>org.apache.tajo.storage.trevni.TrevniScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.parquet.class</name>
     <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.parquet.class</name>
-    <value>org.apache.tajo.storage.parquet.ParquetScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.sequencefile.class</name>
     <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
   </property>
 
   <property>
-    <name>tajo.storage.scanner-handler.v2.sequencefile.class</name>
-    <value>org.apache.tajo.storage.sequencefile.SequenceFileScanner</value>
-  </property>
-
-  <property>
     <name>tajo.storage.scanner-handler.avro.class</name>
     <value>org.apache.tajo.storage.avro.AvroScanner</value>
   </property>
 
-  <property>
-    <name>tajo.storage.scanner-handler.v2.avro.class</name>
-    <value>org.apache.tajo.storage.avro.AvroScanner</value>
-  </property>
-
   <!--- Appender Handler -->
   <property>
     <name>tajo.storage.appender-handler</name>


Mime
View raw message