tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject [2/4] TAJO-178: Implements StorageManager for scanning asynchronously. (hyoungjunkim via hyunsik)
Date Fri, 13 Sep 2013 03:29:57 GMT
http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
index 79481fa..9907591 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManager.java
@@ -18,148 +18,27 @@
 
 package org.apache.tajo.storage;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.net.util.Base64;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
-import org.apache.tajo.catalog.TableMetaImpl;
-import org.apache.tajo.catalog.proto.CatalogProtos.TableProto;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.conf.TajoConf.ConfVars;
-import org.apache.tajo.util.Bytes;
-import org.apache.tajo.util.FileUtil;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * StorageManager
  */
-public class StorageManager {
-  private final Log LOG = LogFactory.getLog(StorageManager.class);
+public class StorageManager extends AbstractStorageManager {
 
-  private final TajoConf conf;
-  private final FileSystem fs;
-  private final Path baseDir;
-  private final Path tableBaseDir;
-  private final boolean blocksMetadataEnabled;
-
-  /**
-   * Cache of scanner handlers for each storage type.
-   */
-  private static final Map<String, Class<? extends FileScanner>> SCANNER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends FileScanner>>();
-
-  /**
-   * Cache of appender handlers for each storage type.
-   */
-  private static final Map<String, Class<? extends FileAppender>> APPENDER_HANDLER_CACHE
-      = new ConcurrentHashMap<String, Class<? extends FileAppender>>();
-
-  /**
-   * Cache of constructors for each class. Pins the classes so they
-   * can't be garbage collected until ReflectionUtils can be collected.
-   */
-  private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE =
-      new ConcurrentHashMap<Class<?>, Constructor<?>>();
-
-
-  public StorageManager(TajoConf conf) throws IOException {
-    this.conf = conf;
-    this.baseDir = new Path(conf.getVar(ConfVars.ROOT_DIR));
-    this.tableBaseDir = new Path(this.baseDir, TajoConstants.WAREHOUSE_DIR_NAME);
-    this.fs = baseDir.getFileSystem(conf);
-    this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED,
-        DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
-    if (!this.blocksMetadataEnabled) {
-      LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')");
-    }
-  }
-
-  public static StorageManager get(TajoConf conf) throws IOException {
-    return new StorageManager(conf);
-  }
-
-  public static StorageManager get(TajoConf conf, String dataRoot)
-      throws IOException {
-    conf.setVar(ConfVars.ROOT_DIR, dataRoot);
-    return new StorageManager(conf);
-  }
-
-  public static StorageManager get(TajoConf conf, Path dataRoot)
-      throws IOException {
-    conf.setVar(ConfVars.ROOT_DIR, dataRoot.toString());
-    return new StorageManager(conf);
-  }
-
-  public FileSystem getFileSystem() {
-    return this.fs;
-  }
-
-  public Path getBaseDir() {
-    return this.baseDir;
-  }
-
-  public Path getTableBaseDir() {
-    return this.tableBaseDir;
-  }
-
-  public void delete(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    fs.delete(tablePath, true);
+  protected StorageManager(TajoConf conf) throws IOException {
+    super(conf);
   }
 
-  public boolean exists(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    return fileSystem.exists(path);
-  }
-
-  /**
-   * This method deletes only data contained in the given path.
-   *
-   * @param path The path in which data are deleted.
-   * @throws IOException
-   */
-  public void deleteData(Path path) throws IOException {
-    FileSystem fileSystem = path.getFileSystem(conf);
-    FileStatus[] fileLists = fileSystem.listStatus(path);
-    for (FileStatus status : fileLists) {
-      fileSystem.delete(status.getPath(), true);
-    }
-  }
-
-  public Path getTablePath(String tableName) {
-    return new Path(tableBaseDir, tableName);
-  }
-
-  public static Scanner getScanner(Configuration conf, TableMeta meta, Path path)
-      throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    FileStatus status = fs.getFileStatus(path);
-    Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
-    return getScanner(conf, meta, fragment);
-  }
-
-  public static Scanner getScanner(Configuration conf, TableMeta meta, Fragment fragment)
-      throws IOException {
-    return getScanner(conf, meta, fragment, meta.getSchema());
-  }
-
-  public static Scanner getScanner(Configuration conf, TableMeta meta, Fragment fragment,
-                                   Schema target)
-      throws IOException {
+  @Override
+  public Scanner getScanner(TableMeta meta, Fragment fragment,
+                                   Schema target) throws IOException {
     Scanner scanner;
 
-    Class<? extends FileScanner> scannerClass;
+    Class<? extends Scanner> scannerClass;
 
     String handlerName = meta.getStoreType().name().toLowerCase();
     scannerClass = SCANNER_HANDLER_CACHE.get(handlerName);
@@ -167,7 +46,7 @@ public class StorageManager {
       scannerClass = conf.getClass(
           String.format("tajo.storage.scanner-handler.%s.class",
               meta.getStoreType().name().toLowerCase()), null,
-          FileScanner.class);
+          Scanner.class);
       SCANNER_HANDLER_CACHE.put(handlerName, scannerClass);
     }
 
@@ -182,525 +61,4 @@ public class StorageManager {
 
     return scanner;
   }
-
-  public static Appender getAppender(Configuration conf, TableMeta meta, Path path)
-      throws IOException {
-    Appender appender;
-
-    Class<? extends FileAppender> appenderClass;
-
-    String handlerName = meta.getStoreType().name().toLowerCase();
-    appenderClass = APPENDER_HANDLER_CACHE.get(handlerName);
-    if (appenderClass == null) {
-      appenderClass = conf.getClass(
-          String.format("tajo.storage.appender-handler.%s.class",
-              meta.getStoreType().name().toLowerCase()), null,
-          FileAppender.class);
-      APPENDER_HANDLER_CACHE.put(handlerName, appenderClass);
-    }
-
-    if (appenderClass == null) {
-      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
-    }
-
-    appender = newAppenderInstance(appenderClass, conf, meta, path);
-
-    return appender;
-  }
-
-
-  public TableMeta getTableMeta(Path tablePath) throws IOException {
-    TableMeta meta;
-
-    FileSystem fs = tablePath.getFileSystem(conf);
-    Path tableMetaPath = new Path(tablePath, ".meta");
-    if (!fs.exists(tableMetaPath)) {
-      throw new FileNotFoundException(".meta file not found in " + tablePath.toString());
-    }
-
-    FSDataInputStream tableMetaIn = fs.open(tableMetaPath);
-
-    TableProto tableProto = (TableProto) FileUtil.loadProto(tableMetaIn,
-        TableProto.getDefaultInstance());
-    meta = new TableMetaImpl(tableProto);
-
-    return meta;
-  }
-
-  public Fragment[] split(String tableName) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
-    return split(tableName, tablePath, fs.getDefaultBlockSize());
-  }
-
-  public Fragment[] split(String tableName, long fragmentSize) throws IOException {
-    Path tablePath = new Path(tableBaseDir, tableName);
-    return split(tableName, tablePath, fragmentSize);
-  }
-
-  public Fragment[] splitBroadcastTable(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    TableMeta meta = getTableMeta(tablePath);
-    List<Fragment> listTablets = new ArrayList<Fragment>();
-    Fragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      tablet = new Fragment(tablePath.getName(), file.getPath(), meta, 0, file.getLen());
-      listTablets.add(tablet);
-    }
-
-    Fragment[] tablets = new Fragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public Fragment[] split(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize());
-  }
-
-  public Fragment[] split(String tableName, Path tablePath) throws IOException {
-    return split(tableName, tablePath, fs.getDefaultBlockSize());
-  }
-
-  private Fragment[] split(String tableName, Path tablePath, long size)
-      throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    TableMeta meta = getTableMeta(tablePath);
-    long defaultBlockSize = size;
-    List<Fragment> listTablets = new ArrayList<Fragment>();
-    Fragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      long remainFileSize = file.getLen();
-      long start = 0;
-      if (remainFileSize > defaultBlockSize) {
-        while (remainFileSize > defaultBlockSize) {
-          tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
-          listTablets.add(tablet);
-          start += defaultBlockSize;
-          remainFileSize -= defaultBlockSize;
-        }
-        listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
-      } else {
-        listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
-      }
-    }
-
-    Fragment[] tablets = new Fragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public static Fragment[] splitNG(Configuration conf, String tableName, TableMeta meta,
-                                   Path tablePath, long size)
-      throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-
-    long defaultBlockSize = size;
-    List<Fragment> listTablets = new ArrayList<Fragment>();
-    Fragment tablet;
-
-    FileStatus[] fileLists = fs.listStatus(tablePath);
-    for (FileStatus file : fileLists) {
-      long remainFileSize = file.getLen();
-      long start = 0;
-      if (remainFileSize > defaultBlockSize) {
-        while (remainFileSize > defaultBlockSize) {
-          tablet = new Fragment(tableName, file.getPath(), meta, start, defaultBlockSize);
-          listTablets.add(tablet);
-          start += defaultBlockSize;
-          remainFileSize -= defaultBlockSize;
-        }
-        listTablets.add(new Fragment(tableName, file.getPath(), meta, start, remainFileSize));
-      } else {
-        listTablets.add(new Fragment(tableName, file.getPath(), meta, 0, remainFileSize));
-      }
-    }
-
-    Fragment[] tablets = new Fragment[listTablets.size()];
-    listTablets.toArray(tablets);
-
-    return tablets;
-  }
-
-  public long calculateSize(Path tablePath) throws IOException {
-    FileSystem fs = tablePath.getFileSystem(conf);
-    long totalSize = 0;
-
-    if (fs.exists(tablePath)) {
-      for (FileStatus status : fs.listStatus(tablePath)) {
-        totalSize += status.getLen();
-      }
-    }
-
-    return totalSize;
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // FileInputFormat Area
-  /////////////////////////////////////////////////////////////////////////////
-
-  private static final PathFilter hiddenFileFilter = new PathFilter() {
-    public boolean accept(Path p) {
-      String name = p.getName();
-      return !name.startsWith("_") && !name.startsWith(".");
-    }
-  };
-
-  /**
-   * Proxy PathFilter that accepts a path only if all filters given in the
-   * constructor do. Used by the listPaths() to apply the built-in
-   * hiddenFileFilter together with a user provided one (if any).
-   */
-  private static class MultiPathFilter implements PathFilter {
-    private List<PathFilter> filters;
-
-    public MultiPathFilter(List<PathFilter> filters) {
-      this.filters = filters;
-    }
-
-    public boolean accept(Path path) {
-      for (PathFilter filter : filters) {
-        if (!filter.accept(path)) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
-  /**
-   * List input directories.
-   * Subclasses may override to, e.g., select only files matching a regular
-   * expression.
-   *
-   * @return array of FileStatus objects
-   * @throws IOException if zero items.
-   */
-  protected List<FileStatus> listStatus(Path path) throws IOException {
-    List<FileStatus> result = new ArrayList<FileStatus>();
-    Path[] dirs = new Path[]{path};
-    if (dirs.length == 0) {
-      throw new IOException("No input paths specified in job");
-    }
-
-    List<IOException> errors = new ArrayList<IOException>();
-
-    // creates a MultiPathFilter with the hiddenFileFilter and the
-    // user provided one (if any).
-    List<PathFilter> filters = new ArrayList<PathFilter>();
-    filters.add(hiddenFileFilter);
-
-    PathFilter inputFilter = new MultiPathFilter(filters);
-
-    for (int i = 0; i < dirs.length; ++i) {
-      Path p = dirs[i];
-
-      FileSystem fs = p.getFileSystem(conf);
-      FileStatus[] matches = fs.globStatus(p, inputFilter);
-      if (matches == null) {
-        errors.add(new IOException("Input path does not exist: " + p));
-      } else if (matches.length == 0) {
-        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
-      } else {
-        for (FileStatus globStat : matches) {
-          if (globStat.isDirectory()) {
-            for (FileStatus stat : fs.listStatus(globStat.getPath(),
-                inputFilter)) {
-              result.add(stat);
-            }
-          } else {
-            result.add(globStat);
-          }
-        }
-      }
-    }
-
-    if (!errors.isEmpty()) {
-      throw new InvalidInputException(errors);
-    }
-    LOG.info("Total input paths to process : " + result.size());
-    return result;
-  }
-
-  /**
-   * Get the lower bound on split size imposed by the format.
-   *
-   * @return the number of bytes of the minimal split for this format
-   */
-  protected long getFormatMinSplitSize() {
-    return 1;
-  }
-
-  /**
-   * Is the given filename splitable? Usually, true, but if the file is
-   * stream compressed, it will not be.
-   * <p/>
-   * <code>FileInputFormat</code> implementations can override this and return
-   * <code>false</code> to ensure that individual input files are never split-up
-   * so that Mappers process entire files.
-   *
-   * @param filename the file name to check
-   * @return is this file isSplittable?
-   */
-  protected boolean isSplittable(TableMeta meta, Path filename) throws IOException {
-    Scanner scanner = getScanner(conf, meta, filename);
-    return scanner.isSplittable();
-  }
-
-  @Deprecated
-  protected long computeSplitSize(long blockSize, long minSize,
-                                  long maxSize) {
-    return Math.max(minSize, Math.min(maxSize, blockSize));
-  }
-
-  @Deprecated
-  private static final double SPLIT_SLOP = 1.1;   // 10% slop
-
-  @Deprecated
-  protected int getBlockIndex(BlockLocation[] blkLocations,
-                              long offset) {
-    for (int i = 0; i < blkLocations.length; i++) {
-      // is the offset inside this block?
-      if ((blkLocations[i].getOffset() <= offset) &&
-          (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) {
-        return i;
-      }
-    }
-    BlockLocation last = blkLocations[blkLocations.length - 1];
-    long fileLength = last.getOffset() + last.getLength() - 1;
-    throw new IllegalArgumentException("Offset " + offset +
-        " is outside of file (0.." +
-        fileLength + ")");
-  }
-
-  /**
-   * A factory that makes the split for this class. It can be overridden
-   * by sub-classes to make sub-types
-   */
-  protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, long start, long length) {
-    return new Fragment(fragmentId, file, meta, start, length);
-  }
-
-  protected Fragment makeSplit(String fragmentId, TableMeta meta, Path file, BlockLocation blockLocation,
-                               int[] diskIds) throws IOException {
-    return new Fragment(fragmentId, file, meta, blockLocation, diskIds);
-  }
-
-  // for Non Splittable. eg, compressed gzip TextFile
-  protected Fragment makeNonSplit(String fragmentId, TableMeta meta, Path file, long start, long length,
-                                  BlockLocation[] blkLocations) throws IOException {
-
-    Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>();
-    for (BlockLocation blockLocation : blkLocations) {
-      for (String host : blockLocation.getHosts()) {
-        if (hostsBlockMap.containsKey(host)) {
-          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
-        } else {
-          hostsBlockMap.put(host, 1);
-        }
-      }
-    }
-
-    List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet());
-    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
-
-      @Override
-      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) {
-        return v1.getValue().compareTo(v2.getValue());
-      }
-    });
-
-    String[] hosts = new String[blkLocations[0].getHosts().length];
-    int[] hostsBlockCount = new int[blkLocations[0].getHosts().length];
-
-    for (int i = 0; i < hosts.length; i++) {
-      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i);
-      hosts[i] = entry.getKey();
-      hostsBlockCount[i] = entry.getValue();
-    }
-    return new Fragment(fragmentId, file, meta, start, length, hosts, hostsBlockCount);
-  }
-
-  /**
-   * Get the maximum split size.
-   *
-   * @return the maximum number of bytes a split can include
-   */
-  @Deprecated
-  public static long getMaxSplitSize() {
-    // TODO - to be configurable
-    return 536870912L;
-  }
-
-  /**
-   * Get the minimum split size
-   *
-   * @return the minimum number of bytes that can be in a split
-   */
-  @Deprecated
-  public static long getMinSplitSize() {
-    // TODO - to be configurable
-    return 67108864L;
-  }
-
-  /**
-   * Get Disk Ids by Volume Bytes
-   */
-  private int[] getDiskIds(VolumeId[] volumeIds) {
-    int[] diskIds = new int[volumeIds.length];
-    for (int i = 0; i < volumeIds.length; i++) {
-      int diskId = -1;
-      if (volumeIds[i] != null && volumeIds[i].isValid()) {
-        String volumeIdString = volumeIds[i].toString();
-        byte[] volumeIdBytes = Base64.decodeBase64(volumeIdString);
-
-        if (volumeIdBytes.length == 4) {
-          diskId = Bytes.toInt(volumeIdBytes);
-        } else if (volumeIdBytes.length == 1) {
-          diskId = (int) volumeIdBytes[0];  // support hadoop-2.0.2
-        }
-      }
-      diskIds[i] = diskId;
-    }
-    return diskIds;
-  }
-
-  /**
-   * Generate the map of host and make them into Volume Ids.
-   *
-   */
-  private Map<String, Set<Integer>> getVolumeMap(List<Fragment> frags) {
-    Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>();
-    for (Fragment frag : frags) {
-      String[] hosts = frag.getHosts();
-      int[] diskIds = frag.getDiskIds();
-      for (int i = 0; i < hosts.length; i++) {
-        Set<Integer> volumeList = volumeMap.get(hosts[i]);
-        if (volumeList == null) {
-          volumeList = new HashSet<Integer>();
-          volumeMap.put(hosts[i], volumeList);
-        }
-
-        if (diskIds.length > 0 && diskIds[i] > -1) {
-          volumeList.add(diskIds[i]);
-        }
-      }
-    }
-
-    return volumeMap;
-  }
-  /**
-   * Generate the list of files and make them into FileSplits.
-   *
-   * @throws IOException
-   */
-  public List<Fragment> getSplits(String tableName, TableMeta meta, Path inputPath) throws IOException {
-    // generate splits'
-
-    List<Fragment> splits = new ArrayList<Fragment>();
-    List<FileStatus> files = listStatus(inputPath);
-    FileSystem fs = inputPath.getFileSystem(conf);
-    for (FileStatus file : files) {
-      Path path = file.getPath();
-      long length = file.getLen();
-      if (length > 0) {
-        BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
-        boolean splittable = isSplittable(meta, path);
-        if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) {
-          // supported disk volume
-          BlockStorageLocation[] blockStorageLocations = ((DistributedFileSystem) fs)
-              .getFileBlockStorageLocations(Arrays.asList(blkLocations));
-          if (splittable) {
-            for (BlockStorageLocation blockStorageLocation : blockStorageLocations) {
-              splits.add(makeSplit(tableName, meta, path, blockStorageLocation, getDiskIds(blockStorageLocation
-                  .getVolumeIds())));
-            }
-          } else { // Non splittable
-            splits.add(makeNonSplit(tableName, meta, path, 0, length, blockStorageLocations));
-          }
-
-        } else {
-          if (splittable) {
-            for (BlockLocation blockLocation : blkLocations) {
-              splits.add(makeSplit(tableName, meta, path, blockLocation, null));
-            }
-          } else { // Non splittable
-            splits.add(makeNonSplit(tableName, meta, path, 0, length, blkLocations));
-          }
-        }
-      } else {
-        //for zero length files
-        splits.add(makeSplit(tableName, meta, path, 0, length));
-      }
-    }
-
-    LOG.info("Total # of splits: " + splits.size());
-    return splits;
-  }
-
-  private class InvalidInputException extends IOException {
-    public InvalidInputException(
-        List<IOException> errors) {
-    }
-  }
-
-  private static final Class<?>[] DEFAULT_SCANNER_PARAMS = {
-      Configuration.class,
-      TableMeta.class,
-      Fragment.class
-  };
-
-  private static final Class<?>[] DEFAULT_APPENDER_PARAMS = {
-      Configuration.class,
-      TableMeta.class,
-      Path.class
-  };
-
-  /**
-   * create a scanner instance.
-   */
-  public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, TableMeta meta,
-                                         Fragment fragment) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
-      }
-      result = meth.newInstance(new Object[]{conf, meta, fragment});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return result;
-  }
-
-  /**
-   * create a scanner instance.
-   */
-  public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, TableMeta meta,
-                                          Path path) {
-    T result;
-    try {
-      Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
-      if (meth == null) {
-        meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS);
-        meth.setAccessible(true);
-        CONSTRUCTOR_CACHE.put(theClass, meth);
-      }
-      result = meth.newInstance(new Object[]{conf, meta, path});
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    return result;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
new file mode 100644
index 0000000..8b7c2ca
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/StorageManagerFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.v2.StorageManagerV2;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+public class StorageManagerFactory {
+  private static Map<String, AbstractStorageManager> storageManagers =
+      new HashMap<String, AbstractStorageManager>();
+
+  public static AbstractStorageManager getStorageManager(TajoConf conf) throws IOException {
+    return getStorageManager(conf, null);
+  }
+
+  public static synchronized AbstractStorageManager getStorageManager (
+      TajoConf conf, Path dataRoot) throws IOException {
+    return getStorageManager(conf, dataRoot, conf.getBoolean("tajo.storage.manager.v2", false));
+  }
+
+  private static synchronized AbstractStorageManager getStorageManager (
+      TajoConf conf, Path dataRoot, boolean v2) throws IOException {
+    if(dataRoot != null) {
+      conf.setVar(TajoConf.ConfVars.ROOT_DIR, dataRoot.toString());
+    }
+
+    URI uri;
+    if(dataRoot == null) {
+      uri = FileSystem.get(conf).getUri();
+    } else {
+      uri = dataRoot.toUri();
+    }
+    String key = "file".equals(uri.getScheme()) ? "file" : uri.getScheme() + uri.getHost() + uri.getPort();
+
+    if(v2) {
+      key += "_v2";
+    }
+
+    if(storageManagers.containsKey(key)) {
+      return storageManagers.get(key);
+    } else {
+      AbstractStorageManager storageManager = null;
+
+      if(v2) {
+        storageManager = new StorageManagerV2(conf);
+      } else {
+        storageManager = new StorageManager(conf);
+      }
+
+      storageManagers.put(key, storageManager);
+
+      return storageManager;
+    }
+  }
+
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Fragment fragment, Schema schema) throws IOException {
+    return (SeekableScanner)getStorageManager(conf, null, false).getScanner(meta, fragment, schema);
+  }
+
+  public static synchronized SeekableScanner getSeekableScanner(
+      TajoConf conf, TableMeta meta, Path path) throws IOException {
+
+    FileSystem fs = path.getFileSystem(conf);
+    FileStatus status = fs.getFileStatus(path);
+    Fragment fragment = new Fragment(path.getName(), path, meta, 0, status.getLen());
+
+    return getSeekableScanner(conf, meta, fragment, fragment.getSchema());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
new file mode 100644
index 0000000..f34fa84
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/CSVFileScanner.java
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.*;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.exception.UnsupportedException;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.SeekableScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.compress.CodecPool;
+import org.apache.tajo.storage.json.StorageGsonHelper;
+
+public class CSVFileScanner extends FileScannerV2 {
+  public static final String DELIMITER = "csvfile.delimiter";
+  public static final String DELIMITER_DEFAULT = "|";
+  public static final byte LF = '\n';
+  private static final Log LOG = LogFactory.getLog(CSVFileScanner.class);
+
+  private final static int DEFAULT_BUFFER_SIZE = 256 * 1024;
+  private int bufSize;
+  private char delimiter;
+  private FSDataInputStream fis;
+  private InputStream is; //decompressd stream
+  private CompressionCodecFactory factory;
+  private CompressionCodec codec;
+  private Decompressor decompressor;
+  private Seekable filePosition;
+  private boolean splittable = true;
+  private long startOffset, length;
+  private byte[] buf = null;
+  private String[] tuples = null;
+  private long[] tupleOffsets = null;
+  private int currentIdx = 0, validIdx = 0;
+  private byte[] tail = null;
+  private long pageStart = -1;
+  private long prevTailLen = -1;
+  private int[] targetColumnIndexes;
+  private boolean eof = false;
+
+  public CSVFileScanner(Configuration conf, final TableMeta meta,
+                    final Fragment fragment) throws IOException {
+    super(conf, meta, fragment);
+    factory = new CompressionCodecFactory(conf);
+    codec = factory.getCodec(fragment.getPath());
+    if (isCompress() && !(codec instanceof SplittableCompressionCodec)) {
+      splittable = false;
+    }
+  }
+
+  @Override
+  public void init() throws IOException {
+    // Buffer size, Delimiter
+    this.bufSize = DEFAULT_BUFFER_SIZE;
+    String delim  = fragment.getMeta().getOption(DELIMITER, DELIMITER_DEFAULT);
+    this.delimiter = delim.charAt(0);
+
+    super.init();
+  }
+
+  @Override
+  protected void initFirstScan() throws IOException {
+    if(!firstSchdeuled) {
+      return;
+    }
+    firstSchdeuled = false;
+
+    // Fragment information
+    fis = fs.open(fragment.getPath(), 128 * 1024);
+    startOffset = fragment.getStartOffset();
+    length = fragment.getLength();
+
+    if (startOffset > 0) {
+      startOffset--; // prev line feed
+    }
+
+    if (codec != null) {
+      decompressor = CodecPool.getDecompressor(codec);
+      if (codec instanceof SplittableCompressionCodec) {
+        SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+            fis, decompressor, startOffset, startOffset + length,
+            SplittableCompressionCodec.READ_MODE.BYBLOCK);
+
+        startOffset = cIn.getAdjustedStart();
+        length = cIn.getAdjustedEnd() - startOffset;
+        filePosition = cIn;
+        is = cIn;
+      } else {
+        is = new DataInputStream(codec.createInputStream(fis, decompressor));
+      }
+    } else {
+      fis.seek(startOffset);
+      filePosition = fis;
+      is = fis;
+    }
+
+    tuples = new String[0];
+    if (targets == null) {
+      targets = schema.toArray();
+    }
+
+    targetColumnIndexes = new int[targets.length];
+    for (int i = 0; i < targets.length; i++) {
+      targetColumnIndexes[i] = schema.getColumnIdByName(targets[i].getColumnName());
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + length +
+          "," + fs.getFileStatus(fragment.getPath()).getLen());
+    }
+
+    if (startOffset != 0) {
+      int rbyte;
+      while ((rbyte = is.read()) != LF) {
+        if(rbyte == -1) break;
+      }
+    }
+
+    if (fragmentable() < 1) {
+      close();
+      return;
+    }
+    page();
+  }
+
+  private long fragmentable() throws IOException {
+    return startOffset + length - getFilePosition();
+  }
+
+  @Override
+  protected long getFilePosition() throws IOException {
+    long retVal;
+    if (filePosition != null) {
+      retVal = filePosition.getPos();
+    } else {
+      retVal = fis.getPos();
+    }
+    return retVal;
+  }
+
+  private void page() throws IOException {
+    // Index initialization
+    currentIdx = 0;
+
+    // Buffer size set
+    if (isSplittable() && fragmentable() < DEFAULT_BUFFER_SIZE) {
+      bufSize = (int) fragmentable();
+    }
+
+    if (this.tail == null || this.tail.length == 0) {
+      this.pageStart = getFilePosition();
+      this.prevTailLen = 0;
+    } else {
+      this.pageStart = getFilePosition() - this.tail.length;
+      this.prevTailLen = this.tail.length;
+    }
+
+    // Read
+    int rbyte;
+    buf = new byte[bufSize];
+    rbyte = is.read(buf);
+
+    if (rbyte < 0) {
+      eof = true; // EOF
+      return;
+    }
+
+    if (prevTailLen == 0) {
+      tail = new byte[0];
+      tuples = StringUtils.split(new String(buf, 0, rbyte), (char) LF);
+    } else {
+      tuples = StringUtils.split(new String(tail)
+          + new String(buf, 0, rbyte), (char) LF);
+      tail = null;
+    }
+
+    // Check tail
+    if ((char) buf[rbyte - 1] != LF) {
+      if ((fragmentable() < 1 || rbyte != bufSize)) {
+        int cnt = 0;
+        byte[] temp = new byte[DEFAULT_BUFFER_SIZE];
+        // Read bytes
+        while ((temp[cnt] = (byte) is.read()) != LF) {
+          cnt++;
+        }
+
+        // Replace tuple
+        tuples[tuples.length - 1] = tuples[tuples.length - 1]
+            + new String(temp, 0, cnt);
+        validIdx = tuples.length;
+      } else {
+        tail = tuples[tuples.length - 1].getBytes();
+        validIdx = tuples.length - 1;
+      }
+    } else {
+      tail = new byte[0];
+      validIdx = tuples.length;
+    }
+
+    if(!isCompress()) makeTupleOffset();
+  }
+
+  private void makeTupleOffset() {
+    long curTupleOffset = 0;
+    this.tupleOffsets = new long[this.validIdx];
+    for (int i = 0; i < this.validIdx; i++) {
+      this.tupleOffsets[i] = curTupleOffset + this.pageStart;
+      curTupleOffset += this.tuples[i].getBytes().length + 1;// tuple byte
+      // + 1byte
+      // line feed
+    }
+  }
+
+  protected Tuple getNextTuple() throws IOException {
+    try {
+      if (currentIdx == validIdx) {
+        if (isSplittable() && fragmentable() < 1) {
+          close();
+          return null;
+        } else {
+          page();
+        }
+
+        if(eof){
+          close();
+          return null;
+        }
+      }
+
+      long offset = -1;
+      if(!isCompress()){
+        offset = this.tupleOffsets[currentIdx];
+      }
+
+      String[] cells = StringUtils.splitPreserveAllTokens(tuples[currentIdx++], delimiter);
+
+      int targetLen = targets.length;
+
+      VTuple tuple = new VTuple(columnNum);
+      Column field;
+      tuple.setOffset(offset);
+      for (int i = 0; i < targetLen; i++) {
+        field = targets[i];
+        int tid = targetColumnIndexes[i];
+        if (cells.length <= tid) {
+          tuple.put(tid, DatumFactory.createNullDatum());
+        } else {
+          String cell = cells[tid].trim();
+
+          if (cell.equals("")) {
+            tuple.put(tid, DatumFactory.createNullDatum());
+          } else {
+            switch (field.getDataType().getType()) {
+              case BOOLEAN:
+                tuple.put(tid, DatumFactory.createBool(cell));
+                break;
+              case BIT:
+                tuple.put(tid, DatumFactory.createBit(Base64.decodeBase64(cell)[0]));
+                break;
+              case CHAR:
+                String trimmed = cell.trim();
+                tuple.put(tid, DatumFactory.createChar(trimmed));
+                break;
+              case BLOB:
+                tuple.put(tid, DatumFactory.createBlob(Base64.decodeBase64(cell)));
+                break;
+              case INT2:
+                tuple.put(tid, DatumFactory.createInt2(cell));
+                break;
+              case INT4:
+                tuple.put(tid, DatumFactory.createInt4(cell));
+                break;
+              case INT8:
+                tuple.put(tid, DatumFactory.createInt8(cell));
+                break;
+              case FLOAT4:
+                tuple.put(tid, DatumFactory.createFloat4(cell));
+                break;
+              case FLOAT8:
+                tuple.put(tid, DatumFactory.createFloat8(cell));
+                break;
+              case TEXT:
+                tuple.put(tid, DatumFactory.createText(cell));
+                break;
+              case INET4:
+                tuple.put(tid, DatumFactory.createInet4(cell));
+                break;
+              case ARRAY:
+                Datum data = StorageGsonHelper.getInstance().fromJson(cell,
+                    Datum.class);
+                tuple.put(tid, data);
+                break;
+            }
+          }
+        }
+      }
+      return tuple;
+    } catch (Throwable t) {
+      LOG.error(t.getMessage(), t);
+    }
+    return null;
+  }
+
+  private boolean isCompress() {
+    return codec != null;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    super.reset();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if(closed.get()) {
+      return;
+    }
+    try {
+      is.close();
+    } finally {
+      if (decompressor != null) {
+        CodecPool.returnDecompressor(decompressor);
+        decompressor = null;
+      }
+      tuples = null;
+      super.close();
+    }
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return true;
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  @Override
+  public void setSearchCondition(Object expr) {
+  }
+
+  @Override
+  public boolean isSplittable(){
+    return splittable;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
new file mode 100644
index 0000000..7802c91
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskDeviceInfo.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DiskDeviceInfo {
+	private int id;
+	private String name;
+	
+	private List<DiskMountInfo> mountInfos = new ArrayList<DiskMountInfo>();
+
+	public DiskDeviceInfo(int id) {
+		this.id = id;
+	}
+	
+	public int getId() {
+		return id;
+	}
+
+	public String getName() {
+		return name;
+	}
+
+	public void setName(String name) {
+		this.name = name;
+	}
+	
+	@Override
+	public String toString() {
+		return id + "," + name;
+	}
+
+	public void addMountPath(DiskMountInfo diskMountInfo) {
+		mountInfos.add(diskMountInfo);
+	}
+
+	public List<DiskMountInfo> getMountInfos() {
+		return mountInfos;
+	}
+
+	public void setMountInfos(List<DiskMountInfo> mountInfos) {
+		this.mountInfos = mountInfos;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
new file mode 100644
index 0000000..d55a6db
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskFileScanScheduler.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class DiskFileScanScheduler extends Thread {
+  private static final Log LOG = LogFactory.getLog(DiskFileScanScheduler.class);
+
+	private Queue<FileScannerV2> requestQueue = new LinkedList<FileScannerV2>();
+
+  Queue<FileScannerV2> fetchingScanners = new LinkedList<FileScannerV2>();
+
+  private int scanConcurrency;
+
+	private AtomicInteger numOfRunningScanners = new AtomicInteger(0);
+
+	private Object requestQueueMonitor = new Object(); // c++ code style
+
+	private StorageManagerV2.StorgaeManagerContext smContext;
+
+	private DiskDeviceInfo diskDeviceInfo;
+
+	private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private long totalScanCount = 0;
+
+  private FetchWaitingThread fetchWaitingThread;
+
+	public DiskFileScanScheduler(
+			StorageManagerV2.StorgaeManagerContext smContext,
+			DiskDeviceInfo diskDeviceInfo) {
+		super("DiskFileScanner:" + diskDeviceInfo);
+		this.smContext = smContext;
+		this.diskDeviceInfo = diskDeviceInfo;
+		initScannerPool();
+		this.fetchWaitingThread = new FetchWaitingThread();
+		this.fetchWaitingThread.start();
+	}
+
+  public int getDiskId() {
+    return diskDeviceInfo.getId();
+  }
+
+  public void run() {
+    synchronized (requestQueueMonitor) {
+      while(!stopped.get()) {
+        if(isAllScannerRunning()) {
+          try {
+            requestQueueMonitor.wait(2000);
+            continue;
+          } catch (InterruptedException e) {
+            break;
+          }
+        } else {
+          FileScannerV2 fileScanner = requestQueue.poll();
+          if(fileScanner == null) {
+            try {
+              requestQueueMonitor.wait(2000);
+              continue;
+            } catch (InterruptedException e) {
+              break;
+            }
+          }
+          if(fileScanner.isFetchProcessing()) {
+            fetchingScanners.add(fileScanner);
+            synchronized(fetchingScanners) {
+              fetchingScanners.notifyAll();
+            }
+          } else {
+            numOfRunningScanners.incrementAndGet();
+            FileScanRunner fileScanRunner = new FileScanRunner(
+                DiskFileScanScheduler.this, smContext,
+                fileScanner, requestQueueMonitor,
+                numOfRunningScanners);
+            totalScanCount++;
+            fileScanRunner.start();
+          }
+        }
+      }
+    }
+  }
+
+	protected void requestScanFile(FileScannerV2 fileScannerV2) {
+		synchronized (requestQueueMonitor) {
+			requestQueue.offer(fileScannerV2);
+			requestQueueMonitor.notifyAll();
+		}
+	}
+
+  public class FetchWaitingThread extends Thread {
+    public void run() {
+      while(!stopped.get()) {
+        FileScannerV2 scanner = null;
+        synchronized(fetchingScanners) {
+          scanner = fetchingScanners.poll();
+          if(scanner == null) {
+            try {
+              fetchingScanners.wait();
+              continue;
+            } catch (InterruptedException e) {
+              break;
+            }
+          }
+        }
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          break;
+        }
+        synchronized(requestQueueMonitor) {
+          requestQueue.offer(scanner);
+          requestQueueMonitor.notifyAll();
+        }
+      }
+    }
+  }
+
+	private void initScannerPool() {
+		// TODO finally implements heuristic, currently set with property
+		scanConcurrency = smContext.getConf().getInt("tajo.storage.manager.concurrency.perDisk", 1);
+	}
+
+  public int getTotalQueueSize() {
+      return requestQueue.size();
+  }
+
+  boolean isAllScannerRunning() {
+    return numOfRunningScanners.get() >= scanConcurrency;
+  }
+
+  public long getTotalScanCount() {
+    return totalScanCount;
+  }
+
+	public void stopScan() {
+		stopped.set(true);
+		if (fetchWaitingThread != null) {
+      fetchWaitingThread.interrupt();
+		}
+
+		this.interrupt();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
new file mode 100644
index 0000000..d71154c
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskInfo.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+public class DiskInfo {
+	private int id;
+	private String partitionName;
+	private String mountPath;
+	
+	private long capacity;
+	private long used;
+	
+	public DiskInfo(int id, String partitionName) {
+		this.id = id;
+		this.partitionName = partitionName;
+	}
+
+	public int getId() {
+		return id;
+	}
+
+	public void setId(int id) {
+		this.id = id;
+	}
+
+	public String getPartitionName() {
+		return partitionName;
+	}
+
+	public void setPartitionName(String partitionName) {
+		this.partitionName = partitionName;
+	}
+
+	public String getMountPath() {
+		return mountPath;
+	}
+
+	public void setMountPath(String mountPath) {
+		this.mountPath = mountPath;
+	}
+
+	public long getCapacity() {
+		return capacity;
+	}
+
+	public void setCapacity(long capacity) {
+		this.capacity = capacity;
+	}
+
+	public long getUsed() {
+		return used;
+	}
+
+	public void setUsed(long used) {
+		this.used = used;
+	}
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
new file mode 100644
index 0000000..d9b0dd2
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskMountInfo.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+public class DiskMountInfo implements Comparable<DiskMountInfo> {
+	private String mountPath;
+	
+	private long capacity;
+	private long used;
+	
+	private int deviceId;
+	
+	public DiskMountInfo(int deviceId, String mountPath) {
+		this.mountPath = mountPath;
+	}
+
+	public String getMountPath() {
+		return mountPath;
+	}
+
+	public void setMountPath(String mountPath) {
+		this.mountPath = mountPath;
+	}
+
+	public long getCapacity() {
+		return capacity;
+	}
+
+	public void setCapacity(long capacity) {
+		this.capacity = capacity;
+	}
+
+	public long getUsed() {
+		return used;
+	}
+
+	public void setUsed(long used) {
+		this.used = used;
+	}
+
+	public int getDeviceId() {
+		return deviceId;
+	}
+
+	@Override
+	public int compareTo(DiskMountInfo other) {
+		String path1 = mountPath;
+		String path2 = other.mountPath;
+		
+		int path1Depth = "/".equals(path1) ? 0 : path1.split("/", -1).length - 1 ;
+		int path2Depth = "/".equals(path2) ? 0 : path2.split("/", -1).length - 1 ;
+		
+		if(path1Depth > path2Depth) {
+			return -1;
+		} else if(path1Depth < path2Depth) {
+			return 1;
+		} else {
+			int path1Length = path1.length();
+			int path2Length = path2.length();
+			
+			if(path1Length < path2Length) {
+				return 1;
+			} else if(path1Length > path1Length) {
+				return -1;
+			} else {
+				return path1.compareTo(path2);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
new file mode 100644
index 0000000..2daf0f5
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/DiskUtil.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class DiskUtil {
+
+	public enum OSType {
+		OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC
+	}
+
+	static private OSType getOSType() {
+		String osName = System.getProperty("os.name");
+		if (osName.contains("Windows")
+				&& (osName.contains("XP") || osName.contains("2003")
+						|| osName.contains("Vista")
+						|| osName.contains("Windows_7")
+						|| osName.contains("Windows 7") || osName
+							.contains("Windows7"))) {
+			return OSType.OS_TYPE_WINXP;
+		} else if (osName.contains("SunOS") || osName.contains("Solaris")) {
+			return OSType.OS_TYPE_SOLARIS;
+		} else if (osName.contains("Mac")) {
+			return OSType.OS_TYPE_MAC;
+		} else {
+			return OSType.OS_TYPE_UNIX;
+		}
+	}
+	
+	public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException {
+		List<DiskDeviceInfo> deviceInfos;
+		
+		if(getOSType() == OSType.OS_TYPE_UNIX) {
+			deviceInfos = getUnixDiskDeviceInfos();
+			setDeviceMountInfo(deviceInfos);
+		} else {
+			deviceInfos = getDefaultDiskDeviceInfos();
+		}
+		
+		return deviceInfos;
+	}
+
+	private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() {
+		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
+		
+		File file = new File("/proc/partitions");
+		if(!file.exists()) {
+			System.out.println("No partition file:" + file.getAbsolutePath());
+			return getDefaultDiskDeviceInfos();
+		}
+		
+		BufferedReader reader = null;
+		try {
+			reader = new BufferedReader(new InputStreamReader(new FileInputStream("/proc/partitions")));
+			String line = null;
+			
+			int count = 0;
+			Set<String> deviceNames = new TreeSet<String>();
+			while((line = reader.readLine()) != null) {
+				if(count > 0 && !line.trim().isEmpty()) {
+					String[] tokens = line.trim().split(" +");
+					if(tokens.length == 4) {
+						String deviceName = getDiskDeviceName(tokens[3]);
+						deviceNames.add(deviceName);
+					}
+				}
+				count++;
+			}
+			
+			int id = 0;
+			for(String eachDeviceName: deviceNames) {
+				DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++);
+				diskDeviceInfo.setName(eachDeviceName);
+				
+				//TODO set addtional info
+				// /sys/block/sda/queue
+				infos.add(diskDeviceInfo);
+			}
+		} catch (Exception e) {
+			e.printStackTrace();
+		} finally {
+			if(reader != null) {
+				try {
+					reader.close();
+				} catch (IOException e) {
+				}
+			}
+		}
+		
+		return infos;
+	}
+	
+	private static String getDiskDeviceName(String partitionName) {
+		byte[] bytes = partitionName.getBytes();
+		
+		byte[] result = new byte[bytes.length];
+		int length = 0;
+		for(int i = 0; i < bytes.length; i++, length++) {
+			if(bytes[i] >= '0' && bytes[i] <= '9') {
+				break;
+			} else {
+				result[i] = bytes[i];
+			}
+		}
+		
+		return new String(result, 0, length);
+	}
+	
+	private static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() {
+		DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0);
+		diskDeviceInfo.setName("default");
+		
+		List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>();
+		
+		infos.add(diskDeviceInfo);
+		
+		return infos;
+	}
+	
+	
+	private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException {
+		Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>();
+		for(DiskDeviceInfo eachDevice: deviceInfos) {
+			deviceMap.put(eachDevice.getName(), eachDevice);
+		}
+		
+		BufferedReader mountOutput = null;
+		try {
+			Process mountProcess = Runtime.getRuntime().exec("mount");
+			mountOutput = new BufferedReader(new InputStreamReader(
+					mountProcess.getInputStream()));
+			while (true) {
+				String line = mountOutput.readLine();
+				if (line == null) {
+					break;
+				}
+
+				int indexStart = line.indexOf(" on /");
+				int indexEnd = line.indexOf(" ", indexStart + 4);
+
+				String deviceName = line.substring(0, indexStart).trim();
+				System.out.println(deviceName);
+				String[] deviceNameTokens = deviceName.split("/");
+				if(deviceNameTokens.length == 3) {
+					if("dev".equals(deviceNameTokens[1])) {
+						String realDeviceName = getDiskDeviceName(deviceNameTokens[2]);
+						String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath();
+						
+						DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName);
+						if(diskDeviceInfo != null) {
+							diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath));
+						}
+					}
+				}
+			}
+		} catch (IOException e) {
+			throw e;
+		} finally {
+			if (mountOutput != null) {
+				mountOutput.close();
+			}
+		}
+	}
+	
+	public static void main(String[] args) throws Exception {
+		System.out.println("/dev/sde1".split("/").length);
+		for(String eachToken: "/dev/sde1".split("/")) {
+			System.out.println(eachToken);
+		}
+ 	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
new file mode 100644
index 0000000..10f12be
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScanRunner.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FileScanRunner extends Thread {
+  private static final Log LOG = LogFactory.getLog(FileScanRunner.class);
+
+  StorageManagerV2.StorgaeManagerContext smContext;
+	FileScannerV2 fileScanner;
+	Object requestQueueMonitor;
+	AtomicInteger numOfRunningScanners;
+	DiskFileScanScheduler diskFileScanScheduler;
+	
+	int maxReadBytes;
+	
+	public FileScanRunner(DiskFileScanScheduler diskFileScanScheduler, 
+			StorageManagerV2.StorgaeManagerContext smContext,
+      FileScannerV2 fileScanner, Object requestQueueMonitor,
+			AtomicInteger numOfRunningScanners) {
+		super("FileScanRunner:" + fileScanner.getId());
+		this.diskFileScanScheduler = diskFileScanScheduler;
+		this.fileScanner = fileScanner;
+		this.smContext = smContext;
+		this.requestQueueMonitor = requestQueueMonitor;
+		this.numOfRunningScanners = numOfRunningScanners;
+		
+		this.maxReadBytes = smContext.getMaxReadBytesPerScheduleSlot();
+	}
+
+	public void run() {
+    try {
+      long startTime = System.currentTimeMillis();
+      boolean fetching = fileScanner.isFetchProcessing();
+      fileScanner.scan(maxReadBytes);
+//      if(diskFileScanScheduler.getDiskId() == 1) {
+//        LOG.info("========>" + diskFileScanScheduler.getDiskId() + "," + fileScanner.getId() +
+//            ",fetching=" + fetching +
+//            ", scanTime:" + (System.currentTimeMillis() - startTime) + " ms");
+//      }
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    } finally {
+      synchronized(requestQueueMonitor) {
+        numOfRunningScanners.decrementAndGet();
+        requestQueueMonitor.notifyAll();
+      }
+    }
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
new file mode 100644
index 0000000..44c48a5
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/FileScannerV2.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.Scanner;
+import org.apache.tajo.storage.Tuple;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public abstract class FileScannerV2 implements Scanner {
+  private static final Log LOG = LogFactory.getLog(FileScannerV2.class);
+
+  protected AtomicBoolean fetchProcessing = new AtomicBoolean(false);
+
+	protected AtomicBoolean closed = new AtomicBoolean(false);
+
+	protected FileSystem fs;
+
+  protected boolean inited = false;
+  protected final Configuration conf;
+  protected final TableMeta meta;
+  protected final Schema schema;
+  protected final Fragment fragment;
+  protected final int columnNum;
+  protected Column[] targets;
+
+  protected StorageManagerV2.StorgaeManagerContext smContext;
+
+  protected boolean firstSchdeuled = true;
+
+  protected Queue<Tuple> tuplePool;
+
+  AtomicInteger tuplePoolMemory = new AtomicInteger();
+
+  protected abstract Tuple getNextTuple() throws IOException;
+
+  protected abstract void initFirstScan() throws IOException;
+
+  protected abstract long getFilePosition() throws IOException;
+
+	public FileScannerV2(final Configuration conf,
+                       final TableMeta meta,
+                       final Fragment fragment) throws IOException {
+    this.conf = conf;
+    this.meta = meta;
+    this.schema = meta.getSchema();
+    this.fragment = fragment;
+    this.columnNum = this.schema.getColumnNum();
+
+    this.fs = fragment.getPath().getFileSystem(conf);
+
+    tuplePool = new ConcurrentLinkedQueue<Tuple>();
+	}
+
+  public void init() throws IOException {
+    closed.set(false);
+    fetchProcessing.set(false);
+    firstSchdeuled = true;
+    //tuplePoolIndex = 0;
+    if(tuplePool == null) {
+      tuplePool = new ConcurrentLinkedQueue<Tuple>();
+    }
+    tuplePool.clear();
+
+    if(!inited) {
+      smContext.requestFileScan(this);
+    }
+    inited = true;
+  }
+
+  @Override
+  public void reset() throws IOException {
+    close();
+    inited = false;
+
+    init();
+  }
+
+  public String getId() {
+    return fragment.getPath().toString() + ":" + fragment.getStartOffset() + ":" +
+        fragment.getLength() + "_" + System.currentTimeMillis();
+  }
+
+  @Override
+  public Schema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public void setTarget(Column[] targets) {
+    if (inited) {
+      throw new IllegalStateException("Should be called before init()");
+    }
+    this.targets = targets;
+  }
+
+  public Path getPath() {
+    return fragment.getPath();
+  }
+
+  public int getDiskId() {
+    if(fragment.getDiskIds().length <= 0) {
+      //LOG.warn("===> No DiskId:" + fragment.getPath() + ":" + fragment.getStartOffset());
+      return -1;
+    } else {
+      return fragment.getDiskIds()[0];
+    }
+  }
+
+  public void setSearchCondition(Object expr) {
+    if (inited) {
+      throw new IllegalStateException("Should be called before init()");
+    }
+  }
+
+  public void setStorageManagerContext(StorageManagerV2.StorgaeManagerContext context) {
+    this.smContext = context;
+  }
+
+  public boolean isFetchProcessing() {
+//    return fetchProcessing.get();
+    return tuplePoolMemory.get() > 16 * 1024 * 1024;
+  }
+
+  long lastScanScheduleTime;
+
+  public String toString() {
+    return fragment.getPath() + ":" + fragment.getStartOffset();
+  }
+
+  public void scan(int maxBytesPerSchedule) throws IOException {
+    if(firstSchdeuled) {
+      initFirstScan();
+      firstSchdeuled = false;
+    }
+    long scanStartPos = getFilePosition();
+    int recordCount = 0;
+    while(true) {
+      Tuple tuple = getNextTuple();
+      if(tuple == null) {
+        break;
+      }
+      tuplePoolMemory.addAndGet(tuple.size());
+      tuplePool.offer(tuple);
+      recordCount++;
+      if(recordCount % 1000 == 0) {
+        if(getFilePosition() - scanStartPos >= maxBytesPerSchedule) {
+          break;
+        } else {
+          synchronized(tuplePool) {
+            tuplePool.notifyAll();
+          }
+        }
+      }
+    }
+    if(tuplePool != null) {
+      synchronized(tuplePool) {
+        tuplePool.notifyAll();
+      }
+    }
+    if(!isClosed()) {
+      smContext.requestFileScan(this);
+    }
+  }
+
+  public void waitScanStart() {
+    //for test
+    synchronized(fetchProcessing) {
+      try {
+        fetchProcessing.wait();
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if(closed.get()) {
+      return;
+    }
+    closed.set(true);
+
+    synchronized(tuplePool) {
+      tuplePool.notifyAll();
+    }
+    LOG.info(toString() + " closed");
+  }
+
+  public boolean isClosed() {
+    return closed.get();
+  }
+
+  public Tuple next() throws IOException {
+    if(isClosed() && tuplePool == null) {
+      return null;
+    }
+    while(true) {
+      Tuple tuple = tuplePool.poll();
+      if(tuple == null) {
+        if(isClosed()) {
+          tuplePool.clear();
+          tuplePool = null;
+          return null;
+        }
+        synchronized(tuplePool) {
+          try {
+            tuplePool.wait();
+          } catch (InterruptedException e) {
+            break;
+          }
+        }
+      } else {
+        tuplePoolMemory.addAndGet(0 - tuple.size());
+        return tuple;
+      }
+    }
+
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
new file mode 100644
index 0000000..11c3291
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/RCFileScanner.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.storage.rcfile.BytesRefArrayWritable;
+import org.apache.tajo.storage.rcfile.ColumnProjectionUtils;
+import org.apache.tajo.storage.rcfile.RCFile;
+import org.apache.tajo.util.Bytes;
+import org.apache.tajo.util.TUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+public class RCFileScanner extends FileScannerV2 {
+  private static final Log LOG = LogFactory.getLog(RCFileScanner.class);
+
+  private RCFile.Reader in;
+  private long start;
+  private long end;
+  private boolean more = true;
+  private LongWritable key;
+  private BytesRefArrayWritable column;
+  private Integer [] projectionMap;
+
+  public RCFileScanner(final Configuration conf,
+                       final TableMeta meta,
+                       final Fragment fragment) throws IOException {
+    super(conf, meta, fragment);
+
+    this.start = fragment.getStartOffset();
+    this.end = start + fragment.getLength();
+    key = new LongWritable();
+    column = new BytesRefArrayWritable();
+	}
+
+  @Override
+  protected Tuple getNextTuple() throws IOException {
+    more = next(key);
+
+    if (more) {
+      column.clear();
+      in.getCurrentRow(column);
+    }
+
+    if(more) {
+      Tuple tuple = makeTuple();
+      return tuple;
+    } else {
+      close();
+      return null;
+    }
+  }
+
+  private Tuple makeTuple() throws IOException {
+    column.resetValid(schema.getColumnNum());
+    Tuple tuple = new VTuple(schema.getColumnNum());
+    int tid; // target column id
+    for (int i = 0; i < projectionMap.length; i++) {
+      tid = projectionMap[i];
+      // if the column is byte[0], it presents a NULL value.
+      if (column.get(tid).getLength() == 0) {
+        tuple.put(tid, DatumFactory.createNullDatum());
+      } else {
+        switch (targets[i].getDataType().getType()) {
+          case BOOLEAN:
+            tuple.put(tid,
+                DatumFactory.createBool(column.get(tid).getBytesCopy()[0]));
+            break;
+          case BIT:
+            tuple.put(tid,
+                DatumFactory.createBit(column.get(tid).getBytesCopy()[0]));
+            break;
+          case CHAR:
+            byte[] buf = column.get(tid).getBytesCopy();
+            tuple.put(tid,
+                DatumFactory.createChar(buf));
+            break;
+
+          case INT2:
+            tuple.put(tid,
+                DatumFactory.createInt2(Bytes.toShort(
+                    column.get(tid).getBytesCopy())));
+            break;
+          case INT4:
+            tuple.put(tid,
+                DatumFactory.createInt4(Bytes.toInt(
+                    column.get(tid).getBytesCopy())));
+            break;
+
+          case INT8:
+            tuple.put(tid,
+                DatumFactory.createInt8(Bytes.toLong(
+                    column.get(tid).getBytesCopy())));
+            break;
+
+          case FLOAT4:
+            tuple.put(tid,
+                DatumFactory.createFloat4(Bytes.toFloat(
+                    column.get(tid).getBytesCopy())));
+            break;
+
+          case FLOAT8:
+            tuple.put(tid,
+                DatumFactory.createFloat8(Bytes.toDouble(
+                    column.get(tid).getBytesCopy())));
+            break;
+
+          case INET4:
+            tuple.put(tid,
+                DatumFactory.createInet4(column.get(tid).getBytesCopy()));
+            break;
+
+          case TEXT:
+            tuple.put(tid,
+                DatumFactory.createText(
+                    column.get(tid).getBytesCopy()));
+            break;
+
+          case BLOB:
+            tuple.put(tid,
+                DatumFactory.createBlob(column.get(tid).getBytesCopy()));
+            break;
+
+          default:
+            throw new IOException("Unsupport data type");
+        }
+      }
+    }
+
+    return tuple;
+  }
+
+  @Override
+  public void init() throws IOException {
+    if (targets == null) {
+      targets = schema.toArray();
+    }
+
+    prepareProjection(targets);
+
+    super.init();
+  }
+
+  private void prepareProjection(Column[] targets) {
+    projectionMap = new Integer[targets.length];
+    int tid;
+    for (int i = 0; i < targets.length; i++) {
+      tid = schema.getColumnIdByName(targets[i].getColumnName());
+      projectionMap[i] = tid;
+    }
+    ArrayList<Integer> projectionIdList = new ArrayList<Integer>(TUtil.newList(projectionMap));
+    ColumnProjectionUtils.setReadColumnIDs(conf, projectionIdList);
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      if(in != null) {
+        in.close();
+        in = null;
+      }
+    } catch (Exception e) {
+    }
+
+    if(column != null) {
+      column.clear();
+      column = null;
+    }
+    super.close();
+  }
+
+  private boolean next(LongWritable key) throws IOException {
+    if (!more) {
+      return false;
+    }
+
+    more = in.next(key);
+    if (!more) {
+      return false;
+    }
+
+    long lastSeenSyncPos = in.lastSeenSyncPos();
+    if (lastSeenSyncPos >= end) {
+      more = false;
+      return more;
+    }
+    return more;
+  }
+
+  @Override
+  protected void initFirstScan() throws IOException {
+    if(!firstSchdeuled) {
+      return;
+    }
+    this.in = new RCFile.Reader(fs, fragment.getPath(), conf);
+
+    if (start > in.getPosition()) {
+      in.sync(start); // sync to start
+    }
+    this.start = in.getPosition();
+    more = start < end;
+    firstSchdeuled = false;
+  }
+
+  @Override
+  protected long getFilePosition() throws IOException {
+    return in.getPosition();
+  }
+
+  @Override
+  public void reset() throws IOException {
+    //in.seek(0);
+    super.reset();
+  }
+
+  @Override
+  public boolean isProjectable() {
+    return true;
+  }
+
+  @Override
+  public boolean isSelectable() {
+    return false;
+  }
+
+  @Override
+  public boolean isSplittable(){
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
new file mode 100644
index 0000000..eca590f
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/ScanScheduler.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.storage.v2.StorageManagerV2.StorgaeManagerContext;
+
+public class ScanScheduler extends Thread {
+  private static final Log LOG = LogFactory.getLog(ScanScheduler.class);
+
+  private Object scanQueueLock;
+  private StorgaeManagerContext context;
+
+  private Map<String, FileScannerV2> requestMap = new HashMap<String, FileScannerV2>();
+
+  private Map<Integer, DiskFileScanScheduler> diskFileScannerMap = new HashMap<Integer, DiskFileScanScheduler>();
+
+  private Map<Integer, DiskDeviceInfo> diskDeviceInfoMap = new HashMap<Integer, DiskDeviceInfo>();
+
+  private SortedSet<DiskMountInfo> diskMountInfos = new TreeSet<DiskMountInfo>();
+
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+
+  private Random rand = new Random(System.currentTimeMillis());
+
+  public ScanScheduler(StorgaeManagerContext context) {
+    this.context = context;
+    this.scanQueueLock = context.getScanQueueLock();
+
+    try {
+      List<DiskDeviceInfo> deviceInfos = DiskUtil.getDiskDeviceInfos();
+      for(DiskDeviceInfo eachInfo: deviceInfos) {
+        LOG.info("Create DiskScanQueue:" + eachInfo.getName());
+        diskDeviceInfoMap.put(eachInfo.getId(), eachInfo);
+
+        diskMountInfos.addAll(eachInfo.getMountInfos());
+      }
+
+      initFileScanners();
+    } catch (Exception e) {
+      LOG.error(e.getMessage(), e);
+    }
+  }
+
+  public void run() {
+    synchronized(scanQueueLock) {
+      while(!stopped.get()) {
+        FileScannerV2 fileScannerV2 = context.getScanQueue().poll();
+        if(fileScannerV2 == null) {
+          try {
+            scanQueueLock.wait();
+          } catch (InterruptedException e) {
+            break;
+          }
+        } else {
+          int diskId = fileScannerV2.getDiskId();
+
+          //LOG.info("Scan Scheduled:" + diskId + "," + fileScannerV2.toString());
+
+          if(diskId < 0 || diskId >= diskDeviceInfoMap.size()) {
+            diskId = findDiskPartitionPath(fileScannerV2.getPath().toString());
+            if(diskId < 0) {
+
+              diskId = findMinQueueDisk();
+              if(diskId < 0) {
+                diskId = rand.nextInt(diskDeviceInfoMap.size());
+              }
+            }
+          }
+
+          synchronized(diskFileScannerMap) {
+            requestMap.put(fileScannerV2.getId(), fileScannerV2);
+            DiskFileScanScheduler diskScheduler = diskFileScannerMap.get(diskId);
+            diskScheduler.requestScanFile(fileScannerV2);
+          }
+        }
+      }
+    }
+  }
+
+  private int findMinQueueDisk() {
+    int minValue = Integer.MAX_VALUE;
+    int minId = -1;
+    synchronized(diskFileScannerMap) {
+      for(DiskFileScanScheduler eachDiskScanner: diskFileScannerMap.values()) {
+        int queueSize = eachDiskScanner.getTotalQueueSize();
+        if(queueSize <= minValue) {
+          minValue = queueSize;
+          minId = eachDiskScanner.getDiskId();
+        }
+      }
+    }
+
+    return minId;
+  }
+
+  private int findDiskPartitionPath(String fullPath) {
+    for (DiskMountInfo eachMountInfo : diskMountInfos) {
+      if (fullPath.indexOf(eachMountInfo.getMountPath()) == 0) {
+        return eachMountInfo.getDeviceId();
+      }
+    }
+
+    return -1;
+  }
+
+  private void initFileScanners() {
+    for(Integer eachId: diskDeviceInfoMap.keySet()) {
+      DiskFileScanScheduler scanner = new DiskFileScanScheduler(context, diskDeviceInfoMap.get(eachId));
+      scanner.start();
+
+      diskFileScannerMap.put(eachId, scanner);
+    }
+  }
+
+  public void stopScheduler() {
+    stopped.set(true);
+    for(DiskFileScanScheduler eachDiskScanner: diskFileScannerMap.values()) {
+      eachDiskScanner.stopScan();
+    }
+    this.interrupt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/5ad7fbae/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
new file mode 100644
index 0000000..1ba6048
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/v2/StorageManagerV2.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.v2;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.storage.AbstractStorageManager;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.Fragment;
+import org.apache.tajo.storage.Scanner;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.Queue;
+
+public class StorageManagerV2 extends AbstractStorageManager {
+  private final Log LOG = LogFactory.getLog(StorageManagerV2.class);
+
+	private Queue<FileScannerV2> scanQueue = new LinkedList<FileScannerV2>();
+	
+	private Object scanQueueLock = new Object();
+	
+	private Object scanDataLock = new Object();
+	
+	private ScanScheduler scanScheduler;
+	
+	private StorgaeManagerContext context;
+
+  public StorageManagerV2(TajoConf conf) throws IOException {
+    super(conf);
+		context = new StorgaeManagerContext();
+		scanScheduler = new ScanScheduler(context);
+		scanScheduler.start();
+    LOG.info("StorageManager v2 started...");
+	}
+
+  @Override
+  public Scanner getScanner(TableMeta meta, Fragment fragment,
+                            Schema target) throws IOException {
+    Scanner scanner;
+
+    Class<? extends Scanner> scannerClass;
+
+    String handlerName = meta.getStoreType().name().toLowerCase();
+    String handlerNameKey = handlerName + "_v2";
+
+    scannerClass = SCANNER_HANDLER_CACHE.get(handlerNameKey);
+    if (scannerClass == null) {
+      scannerClass = conf.getClass(
+          String.format("tajo.storage.scanner-handler.v2.%s.class",
+              meta.getStoreType().name().toLowerCase()), null,
+          Scanner.class);
+      SCANNER_HANDLER_CACHE.put(handlerNameKey, scannerClass);
+    }
+
+    if (scannerClass == null) {
+      throw new IOException("Unknown Storage Type: " + meta.getStoreType());
+    }
+
+    scanner = newScannerInstance(scannerClass, conf, meta, fragment);
+    if (scanner.isProjectable()) {
+      scanner.setTarget(target.toArray());
+    }
+
+    if(scanner instanceof FileScannerV2) {
+      ((FileScannerV2)scanner).setStorageManagerContext(context);
+    }
+    return scanner;
+  }
+
+	public void requestFileScan(FileScannerV2 fileScanner) {
+		synchronized(scanQueueLock) {
+			scanQueue.offer(fileScanner);
+			
+			scanQueueLock.notifyAll();
+		}
+	}
+
+	public StorgaeManagerContext getContext() {
+		return context;
+	}
+
+  public class StorgaeManagerContext {
+		public Object getScanQueueLock() {
+			return scanQueueLock;
+		}
+
+		public Object getScanDataLock() {
+			return scanDataLock;
+		}
+		
+		public Queue<FileScannerV2> getScanQueue() {
+			return scanQueue;
+		}
+
+		public int getMaxReadBytesPerScheduleSlot() {
+			return conf.getInt("tajo.storage.manager.maxReadBytes", 8 * 1024 * 1024);		//8MB
+		}
+
+    public void requestFileScan(FileScannerV2 fileScanner) {
+      StorageManagerV2.this.requestFileScan(fileScanner);
+    }
+
+    public TajoConf getConf() {
+      return conf;
+    }
+  }
+
+	public void stop() {
+		if(scanScheduler != null) {
+			scanScheduler.stopScheduler();
+		}
+	}
+}


Mime
View raw message