tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject git commit: TAJO-134: Support for compression/decompression of CSVFile. (jinho)
Date Mon, 02 Sep 2013 13:23:12 GMT
Updated Branches:
  refs/heads/master 156b91ab7 -> bc8e1804c


TAJO-134: Support for compression/decompression of CSVFile. (jinho)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tajo/commit/bc8e1804
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tajo/tree/bc8e1804
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tajo/diff/bc8e1804

Branch: refs/heads/master
Commit: bc8e1804c8ec5e02f09c77a2cfe977520f76278d
Parents: 156b91a
Author: jinossy <jinossy@gmail.com>
Authored: Mon Sep 2 22:21:31 2013 +0900
Committer: jinossy <jinossy@gmail.com>
Committed: Mon Sep 2 22:21:31 2013 +0900

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 .../java/org/apache/tajo/catalog/TableMeta.java |   4 +-
 .../apache/tajo/master/TaskSchedulerImpl.java   |   4 +-
 .../tajo/master/YarnTaskRunnerLauncherImpl.java |   7 -
 .../master/rm/YarnRMContainerAllocator.java     |  28 ++-
 .../tajo/worker/YarnResourceAllocator.java      |   6 +-
 .../java/org/apache/tajo/storage/CSVFile.java   | 227 ++++++++++++++-----
 .../apache/tajo/storage/compress/CodecPool.java | 185 +++++++++++++++
 .../apache/tajo/storage/rcfile/CodecPool.java   | 165 --------------
 .../org/apache/tajo/storage/rcfile/RCFile.java  |  14 +-
 .../tajo/storage/TestCompressionStorages.java   | 209 +++++++++++++++++
 tajo-dist/src/main/bin/tajo                     |   7 +
 12 files changed, 612 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8fb23fe..9f96db0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,8 @@ Release 0.2.0 - unreleased
 
   NEW FEATURES
 
+    TAJO-134: Support for compression/decompression of CSVFile. (jinho)
+
     TAJO-59: Implement Char Datum Type. (jihoon)
    
     TAJO-96: Design and implement rewrite rule interface and the rewrite rule 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableMeta.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableMeta.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableMeta.java
index 184bd33..a143d34 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableMeta.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TableMeta.java
@@ -28,7 +28,9 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 
 public interface TableMeta extends ProtoObject<TableProto>, Cloneable, GsonObject {
-  
+
+  static String COMPRESSION_CODEC = "compression.codec";
+
   void setStorageType(StoreType storeType);
   
   StoreType getStoreType();

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
index 17b6168..9b454f6 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/TaskSchedulerImpl.java
@@ -111,7 +111,7 @@ public class TaskSchedulerImpl extends AbstractService
 
         while(!stopEventHandling && !Thread.currentThread().isInterrupted()) {
           try {
-            Thread.sleep(1000);
+            Thread.sleep(100);
           } catch (InterruptedException e) {
             break;
           }
@@ -324,7 +324,7 @@ public class TaskSchedulerImpl extends AbstractService
 
       if(volumeEntry != null){
         volumeUsageMap.put(volumeEntry.getKey(), volumeEntry.getValue() + 1);
-        LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", concurrency : "
+        LOG.info("Assigned host : " + host + " Volume : " + volumeEntry.getKey() + ", Concurrency : "
             + volumeUsageMap.get(volumeEntry.getKey()));
         return volumeEntry.getKey();
       } else {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
index 5ac4fb5..472d61d 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/YarnTaskRunnerLauncherImpl.java
@@ -80,13 +80,6 @@ public class YarnTaskRunnerLauncherImpl extends AbstractService implements TaskR
   public void stop() {
     executorService.shutdownNow();
 
-    while(!executorService.isTerminated()) {
-      LOG.info("====>executorService.isTerminated:" + executorService.isTerminated() + "," + executorService.isShutdown());
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-      }
-    }
     Map<ContainerId, ContainerProxy> containers = context.getResourceAllocator().getContainers();
     for(ContainerProxy eachProxy: containers.values()) {
       try {

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
index b159983..4392158 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/master/rm/YarnRMContainerAllocator.java
@@ -48,6 +48,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class YarnRMContainerAllocator extends AMRMClientImpl
     implements EventHandler<ContainerAllocationEvent> {
@@ -98,7 +99,7 @@ public class YarnRMContainerAllocator extends AMRMClientImpl
 
   protected Thread allocatorThread;
   private final AtomicBoolean stopped = new AtomicBoolean(false);
-  private int rmPollInterval = 1000;//millis
+  private int rmPollInterval = 100;//millis
 
   protected void startAllocatorThread() {
     allocatorThread = new Thread(new Runnable() {
@@ -167,6 +168,9 @@ public class YarnRMContainerAllocator extends AMRMClientImpl
   private final Map<Priority, ExecutionBlockId> subQueryMap =
       new HashMap<Priority, ExecutionBlockId>();
 
+  private AtomicLong prevReportTime = new AtomicLong(0);
+  private int reportInterval = 5 * 1000; // second
+
   public void heartbeat() throws Exception {
     AllocateResponse allocateResponse = allocate(context.getProgress());
     AMResponse response = allocateResponse.getAMResponse();
@@ -176,10 +180,15 @@ public class YarnRMContainerAllocator extends AMRMClientImpl
     }
     List<Container> allocatedContainers = response.getAllocatedContainers();
 
-    LOG.info("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
-    LOG.info("Available Resource: " + response.getAvailableResources());
-    LOG.info("Num of Allocated Containers: " + response.getAllocatedContainers().size());
-    if (response.getAllocatedContainers().size() > 0) {
+    long currentTime = System.currentTimeMillis();
+    if ((currentTime - prevReportTime.longValue()) >= reportInterval) {
+      LOG.debug("Available Cluster Nodes: " + allocateResponse.getNumClusterNodes());
+      LOG.debug("Num of Allocated Containers: " + allocatedContainers.size());
+      LOG.info("Available Resource: " + response.getAvailableResources());
+      prevReportTime.set(currentTime);
+    }
+
+    if (allocatedContainers.size() > 0) {
       LOG.info("================================================================");
       for (Container container : response.getAllocatedContainers()) {
         LOG.info("> Container Id: " + container.getId());
@@ -189,18 +198,13 @@ public class YarnRMContainerAllocator extends AMRMClientImpl
         LOG.info("> Priority: " + container.getPriority());
       }
       LOG.info("================================================================");
-    }
 
-    Map<ExecutionBlockId, List<Container>> allocated = new HashMap<ExecutionBlockId, List<Container>>();
-    if (allocatedContainers.size() > 0) {
+      Map<ExecutionBlockId, List<Container>> allocated = new HashMap<ExecutionBlockId, List<Container>>();
       for (Container container : allocatedContainers) {
         ExecutionBlockId executionBlockId = subQueryMap.get(container.getPriority());
         SubQueryState state = context.getSubQuery(executionBlockId).getState();
-        if (!(SubQuery.isRunningState(state) && subQueryMap.containsKey(container.getPriority()))) {
+        if (!(SubQuery.isRunningState(state))) {
           releaseAssignedContainer(container.getId());
-          synchronized (subQueryMap) {
-            subQueryMap.remove(container.getPriority());
-          }
         } else {
           if (allocated.containsKey(executionBlockId)) {
             allocated.get(executionBlockId).add(container);

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
index d47cc81..2897b14 100644
--- a/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
+++ b/tajo-core/tajo-core-backend/src/main/java/org/apache/tajo/worker/YarnResourceAllocator.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.client.YarnClientImpl;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.tajo.QueryConf;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.master.TaskRunnerGroupEvent;
 import org.apache.tajo.master.TaskRunnerLauncher;
 import org.apache.tajo.master.YarnTaskRunnerLauncherImpl;
@@ -66,7 +67,10 @@ public class YarnResourceAllocator extends AbstractResourceAllocator {
   @Override
   public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int numTasks) {
     int numClusterNodes = workerContext.getNumClusterNodes();
-    return numClusterNodes == 0 ? numTasks: Math.min(numTasks, numClusterNodes * 4);
+
+    TajoConf conf =  (TajoConf)workerContext.getQueryMaster().getConfig();
+    int workerNum = conf.getIntVar(TajoConf.ConfVars.MAX_WORKER_PER_NODE);
+    return numClusterNodes == 0 ? numTasks: Math.min(numTasks, numClusterNodes * workerNum);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
index 5c4fa86..d8e6ea1 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/CSVFile.java
@@ -23,25 +23,26 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.*;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStat;
 import org.apache.tajo.datum.*;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.storage.exception.AlreadyExistsStorageException;
 import org.apache.tajo.storage.json.StorageGsonHelper;
+import org.apache.tajo.storage.compress.CodecPool;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
+import java.io.*;
 import java.util.Arrays;
 
 public class CSVFile {
   public static final String DELIMITER = "csvfile.delimiter";
   public static final String DELIMITER_DEFAULT = "|";
+  public static final byte LF = '\n';
   private static final Log LOG = LogFactory.getLog(CSVFile.class);
 
   public static class CSVAppender extends FileAppender {
@@ -49,8 +50,14 @@ public class CSVFile {
     private final Schema schema;
     private final FileSystem fs;
     private FSDataOutputStream fos;
+    private DataOutputStream outputStream;
+    private CompressionOutputStream deflateFilter;
     private String delimiter;
     private TableStatistics stats = null;
+    private Compressor compressor;
+    private CompressionCodecFactory codecFactory;
+    private CompressionCodec codec;
+    private Path compressedPath;
 
     public CSVAppender(Configuration conf, final TableMeta meta,
                        final Path path) throws IOException {
@@ -67,11 +74,31 @@ public class CSVFile {
         throw new FileNotFoundException(path.toString());
       }
 
-      if (fs.exists(path)) {
-        throw new AlreadyExistsStorageException(path);
-      }
+      String codecName = this.meta.getOption(TableMeta.COMPRESSION_CODEC);
+      if(!StringUtils.isEmpty(codecName)){
+        codecFactory = new CompressionCodecFactory(conf);
+        codec = codecFactory.getCodecByClassName(codecName);
+        compressor =  CodecPool.getCompressor(codec);
+        if(compressor != null) compressor.reset();  //builtin gzip is null
+
+        String extension = codec.getDefaultExtension();
+        compressedPath = path.suffix(extension);
+
+        if (fs.exists(compressedPath)) {
+          throw new AlreadyExistsStorageException(compressedPath);
+        }
 
-      fos = fs.create(path);
+        fos = fs.create(compressedPath);
+        deflateFilter = codec.createOutputStream(fos, compressor);
+        outputStream = new DataOutputStream(new BufferedOutputStream(deflateFilter));
+
+      } else {
+        if (fs.exists(path)) {
+          throw new AlreadyExistsStorageException(path);
+        }
+        fos = fs.create(path);
+        outputStream = fos;
+      }
 
       if (enabledStats) {
         this.stats = new TableStatistics(this.schema);
@@ -156,8 +183,7 @@ public class CSVFile {
         sb.deleteCharAt(sb.length() - 1);
       }
       sb.append('\n');
-      fos.writeBytes(sb.toString());
-
+      outputStream.write(sb.toString().getBytes());
       // Statistical section
       if (enabledStats) {
         stats.incrementRow();
@@ -171,16 +197,32 @@ public class CSVFile {
 
     @Override
     public void flush() throws IOException {
-      fos.flush();
+      outputStream.flush();
     }
 
     @Override
     public void close() throws IOException {
       // Statistical section
       if (enabledStats) {
-        stats.setNumBytes(fos.getPos());
+        stats.setNumBytes(getOffset());
+      }
+
+      try {
+        flush();
+
+        if(deflateFilter != null) {
+          deflateFilter.finish();
+          deflateFilter.resetState();
+          deflateFilter = null;
+        }
+
+        fos.close();
+      } finally {
+        if (compressor != null) {
+          CodecPool.returnCompressor(compressor);
+          compressor = null;
+        }
       }
-      fos.close();
     }
 
     @Override
@@ -191,21 +233,36 @@ public class CSVFile {
         return null;
       }
     }
+
+    public boolean isCompress() {
+      return compressor != null;
+    }
+
+    public String getExtension() {
+      return codec != null ? codec.getDefaultExtension() : "";
+    }
   }
 
   public static class CSVScanner extends FileScanner implements SeekableScanner {
     public CSVScanner(Configuration conf, final TableMeta meta,
                       final Fragment fragment) throws IOException {
       super(conf, meta, fragment);
+      factory = new CompressionCodecFactory(conf);
+      codec = factory.getCodec(fragment.getPath());
     }
 
-    private static final byte LF = '\n';
-    private final static long DEFAULT_BUFFER_SIZE = 256 * 1024;
-    private long bufSize;
+    private final static int DEFAULT_BUFFER_SIZE = 256 * 1024;
+    private int bufSize;
     private char delimiter;
     private FileSystem fs;
     private FSDataInputStream fis;
-    private long startOffset, length, startPos;
+    private InputStream is; //decompressd stream
+    private CompressionCodecFactory factory;
+    private CompressionCodec codec;
+    private Decompressor decompressor;
+    private Seekable filePosition;
+    private boolean splittable = true;
+    private long startOffset, length;
     private byte[] buf = null;
     private String[] tuples = null;
     private long[] tupleOffsets = null;
@@ -214,6 +271,7 @@ public class CSVFile {
     private long pageStart = -1;
     private long prevTailLen = -1;
     private int[] targetColumnIndexes;
+    private boolean eof = false;
 
     @Override
     public void init() throws IOException {
@@ -224,12 +282,35 @@ public class CSVFile {
       this.delimiter = delim.charAt(0);
 
       // Fragment information
-      this.fs = fragment.getPath().getFileSystem(this.conf);
-      this.fis = this.fs.open(fragment.getPath());
-      this.startOffset = fragment.getStartOffset();
-      this.length = fragment.getLength();
-      tuples = new String[0];
+      fs = fragment.getPath().getFileSystem(conf);
+      fis = fs.open(fragment.getPath());
+      startOffset = fragment.getStartOffset();
+      length = fragment.getLength();
+
+      if(startOffset > 0) startOffset--; // prev line feed
+
+      if (codec != null) {
+        decompressor = CodecPool.getDecompressor(codec);
+        if (codec instanceof SplittableCompressionCodec) {
+          SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream(
+              fis, decompressor, startOffset, startOffset + length,
+              SplittableCompressionCodec.READ_MODE.BYBLOCK);
+
+          startOffset = cIn.getAdjustedStart();
+          length = cIn.getAdjustedEnd() - startOffset;
+          filePosition = cIn;
+          is = cIn;
+        } else {
+          is = new DataInputStream(codec.createInputStream(fis, decompressor));
+          splittable = false;
+        }
+      } else {
+        fis.seek(startOffset);
+        filePosition = fis;
+        is = fis;
+      }
 
+      tuples = new String[0];
       if (targets == null) {
         targets = schema.toArray();
       }
@@ -246,20 +327,31 @@ public class CSVFile {
       }
 
       if (startOffset != 0) {
-        fis.seek(startOffset - 1);
-        while (fis.readByte() != LF) {
+        int rbyte;
+        while ((rbyte = is.read()) != LF) {
+          if(rbyte == -1) break;
         }
       }
-      startPos = fis.getPos();
+
       if (fragmentable() < 1) {
-        fis.close();
+        close();
         return;
       }
       page();
     }
 
     private long fragmentable() throws IOException {
-      return startOffset + length - fis.getPos();
+      return startOffset + length - getFilePosition();
+    }
+
+    private long getFilePosition() throws IOException {
+      long retVal;
+      if (filePosition != null) {
+        retVal = filePosition.getPos();
+      } else {
+        retVal = fis.getPos();
+      }
+      return retVal;
     }
 
     private void page() throws IOException {
@@ -267,41 +359,46 @@ public class CSVFile {
       currentIdx = 0;
 
       // Buffer size set
-      if (fragmentable() < DEFAULT_BUFFER_SIZE) {
-        bufSize = fragmentable();
+      if (isSplittable() &&  fragmentable() < DEFAULT_BUFFER_SIZE) {
+        bufSize = (int)fragmentable();
       }
 
-
       if (this.tail == null || this.tail.length == 0) {
-        this.pageStart = fis.getPos();
+        this.pageStart = getFilePosition();
         this.prevTailLen = 0;
       } else {
-        this.pageStart = fis.getPos() - this.tail.length;
+        this.pageStart = getFilePosition() - this.tail.length;
         this.prevTailLen = this.tail.length;
       }
 
       // Read
       int rbyte;
-      if (fis.getPos() == startPos) {
-        buf = new byte[(int) bufSize];
-        rbyte = fis.read(buf);
+      buf = new byte[bufSize];
+      rbyte = is.read(buf);
+
+      if(rbyte < 0){
+        eof = true; //EOF
+        return;
+      }
+
+      if (prevTailLen == 0) {
         tail = new byte[0];
         tuples = StringUtils.split(new String(buf, 0, rbyte), (char)LF);
       } else {
-        buf = new byte[(int) bufSize];
-        rbyte = fis.read(buf);
         tuples = StringUtils.split(new String(tail) + new String(buf, 0, rbyte), (char)LF);
+        tail = null;
       }
 
       // Check tail
       if ((char) buf[rbyte - 1] != LF) {
-        if (fragmentable() < 1) {
+        if (isSplittable() && (fragmentable() < 1 || rbyte != bufSize)) {
           int cnt = 0;
-          byte[] temp = new byte[(int)DEFAULT_BUFFER_SIZE];
+          byte[] temp = new byte[DEFAULT_BUFFER_SIZE];
           // Read bytes
-          while ((temp[cnt] = fis.readByte()) != LF) {
+          while ((temp[cnt] = (byte)is.read()) != LF) {
             cnt++;
           }
+
           // Replace tuple
           tuples[tuples.length - 1] = tuples[tuples.length - 1] + new String(temp, 0, cnt);
           validIdx = tuples.length;
@@ -313,7 +410,8 @@ public class CSVFile {
         tail = new byte[0];
         validIdx = tuples.length;
       }
-      makeTupleOffset();
+
+     if(!isCompress()) makeTupleOffset();
     }
 
     private void makeTupleOffset() {
@@ -329,15 +427,27 @@ public class CSVFile {
     public Tuple next() throws IOException {
       try {
         if (currentIdx == validIdx) {
-          if (fragmentable() < 1) {
-            fis.close();
+          if (isSplittable() && fragmentable() < 1) {
+            close();
             return null;
           } else {
             page();
           }
+
+          if(eof){
+            close();
+            return null;
+          }
         }
-        long offset = this.tupleOffsets[currentIdx];
+
+
+        long offset = -1;
+        if(!isCompress()){
+          offset = this.tupleOffsets[currentIdx];
+        }
+
         String[] cells = StringUtils.splitPreserveAllTokens(tuples[currentIdx++], delimiter);
+
         int targetLen = targets.length;
         VTuple tuple = new VTuple(columnNum);
         Column field;
@@ -399,12 +509,16 @@ public class CSVFile {
         }
         return tuple;
       } catch (Throwable t) {
-        LOG.error("Tuple list length: " + tuples.length, t);
+        LOG.error("Tuple list length: " + (tuples != null ? tuples.length : 0), t);
         LOG.error("Tuple list current index: " + currentIdx, t);
       }
       return null;
     }
 
+    private boolean isCompress() {
+      return codec != null;
+    }
+
     @Override
     public void reset() throws IOException {
       init();
@@ -412,7 +526,14 @@ public class CSVFile {
 
     @Override
     public void close() throws IOException {
-      fis.close();
+      try {
+        is.close();
+      } finally {
+        if (decompressor != null) {
+          CodecPool.returnDecompressor(decompressor);
+          decompressor = null;
+        }
+      }
     }
 
     @Override
@@ -431,14 +552,16 @@ public class CSVFile {
 
     @Override
     public void seek(long offset) throws IOException {
+      if(isCompress()) throw new UnsupportedException();
+
       int tupleIndex = Arrays.binarySearch(this.tupleOffsets, offset);
       if (tupleIndex > -1) {
         this.currentIdx = tupleIndex;
-      } else if (offset >= this.pageStart + this.bufSize
+      } else if (isSplittable() && offset >= this.pageStart + this.bufSize
           + this.prevTailLen - this.tail.length || offset <= this.pageStart) {
-        fis.seek(offset);
+        filePosition.seek(offset);
         tail = new byte[0];
-        buf = new byte[(int) DEFAULT_BUFFER_SIZE];
+        buf = new byte[DEFAULT_BUFFER_SIZE];
         bufSize = DEFAULT_BUFFER_SIZE;
         this.currentIdx = 0;
         this.validIdx = 0;
@@ -455,6 +578,8 @@ public class CSVFile {
 
     @Override
     public long getNextOffset() throws IOException {
+      if(isCompress()) throw new UnsupportedException();
+
       if (this.currentIdx == this.validIdx) {
         if (fragmentable() < 1) {
           return -1;
@@ -467,7 +592,7 @@ public class CSVFile {
 
     @Override
     public boolean isSplittable(){
-      return true;
+      return splittable;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
new file mode 100644
index 0000000..baeda8c
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.compress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.Compressor;
+import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DoNotPool;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A global compressor/decompressor pool used to save and reuse (possibly
+ * native) compression/decompression codecs.
+ */
+public final class CodecPool {
+  private static final Log LOG = LogFactory.getLog(CodecPool.class);
+
+  /**
+   * A global compressor pool used to save the expensive
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL =
+      new HashMap<Class<Compressor>, List<Compressor>>();
+
+  /**
+   * A global decompressor pool used to save the expensive
+   * construction/destruction of (possibly native) decompression codecs.
+   */
+  private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL =
+      new HashMap<Class<Decompressor>, List<Decompressor>>();
+
+  private static <T> T borrow(Map<Class<T>, List<T>> pool,
+      Class<? extends T> codecClass) {
+    T codec = null;
+
+    // Check if an appropriate codec is available
+    synchronized (pool) {
+      if (pool.containsKey(codecClass)) {
+        List<T> codecList = pool.get(codecClass);
+
+        if (codecList != null) {
+          synchronized (codecList) {
+            if (!codecList.isEmpty()) {
+              codec = codecList.remove(codecList.size() - 1);
+            }
+          }
+        }
+      }
+    }
+
+    return codec;
+  }
+
+  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
+    if (codec != null) {
+      Class<T> codecClass = (Class<T>) codec.getClass();
+      synchronized (pool) {
+        if (!pool.containsKey(codecClass)) {
+          pool.put(codecClass, new ArrayList<T>());
+        }
+
+        List<T> codecList = pool.get(codecClass);
+        synchronized (codecList) {
+          codecList.add(codec);
+        }
+      }
+    }
+  }
+
+  /**
+   * Get a {@link Compressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   *
+   * @param codec
+   *          the <code>CompressionCodec</code> for which to get the
+   *          <code>Compressor</code>
+   * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor
+   * @return <code>Compressor</code> for the given <code>CompressionCodec</code>
+   *         from the pool or a new one
+   */
+  public static Compressor getCompressor(CompressionCodec codec, Configuration conf) {
+    Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
+    if (compressor == null) {
+      compressor = codec.createCompressor();
+      LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]");
+    } else {
+      compressor.reinit(conf);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Got recycled compressor");
+      }
+    }
+    return compressor;
+  }
+
+  public static Compressor getCompressor(CompressionCodec codec) {
+    return getCompressor(codec, null);
+  }
+
+  /**
+   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
+   * pool or a new one.
+   *
+   * @param codec
+   *          the <code>CompressionCodec</code> for which to get the
+   *          <code>Decompressor</code>
+   * @return <code>Decompressor</code> for the given
+   *         <code>CompressionCodec</code> the pool or a new one
+   */
+  public static Decompressor getDecompressor(CompressionCodec codec) {
+    Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec
+        .getDecompressorType());
+    if (decompressor == null) {
+      decompressor = codec.createDecompressor();
+      LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]");
+    } else {
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Got recycled decompressor");
+      }
+    }
+    return decompressor;
+  }
+
+  /**
+   * Return the {@link Compressor} to the pool.
+   *
+   * @param compressor
+   *          the <code>Compressor</code> to be returned to the pool
+   */
+  public static void returnCompressor(Compressor compressor) {
+    if (compressor == null) {
+      return;
+    }
+    // if the compressor can't be reused, don't pool it.
+    if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      return;
+    }
+    compressor.reset();
+    payback(COMPRESSOR_POOL, compressor);
+  }
+
+  /**
+   * Return the {@link Decompressor} to the pool.
+   *
+   * @param decompressor
+   *          the <code>Decompressor</code> to be returned to the pool
+   */
+  public static void returnDecompressor(Decompressor decompressor) {
+    if (decompressor == null) {
+      return;
+    }
+    // if the decompressor can't be reused, don't pool it.
+    if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) {
+      return;
+    }
+    decompressor.reset();
+    payback(DECOMPRESSOR_POOL, decompressor);
+  }
+
+  private CodecPool() {
+    // prevent instantiation
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/CodecPool.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/CodecPool.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/CodecPool.java
deleted file mode 100644
index e8f9b5c..0000000
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/CodecPool.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.rcfile;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A global compressor/decompressor pool used to save and reuse (possibly
- * native) compression/decompression codecs.
- */
-public final class CodecPool {
-  private static final Log LOG = LogFactory.getLog(CodecPool.class);
-
-  /**
-   * A global compressor pool used to save the expensive
-   * construction/destruction of (possibly native) decompression codecs.
-   */
-  private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL =
-      new HashMap<Class<Compressor>, List<Compressor>>();
-
-  /**
-   * A global decompressor pool used to save the expensive
-   * construction/destruction of (possibly native) decompression codecs.
-   */
-  private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL =
-      new HashMap<Class<Decompressor>, List<Decompressor>>();
-
-  private static <T> T borrow(Map<Class<T>, List<T>> pool,
-      Class<? extends T> codecClass) {
-    T codec = null;
-
-    // Check if an appropriate codec is available
-    synchronized (pool) {
-      if (pool.containsKey(codecClass)) {
-        List<T> codecList = pool.get(codecClass);
-
-        if (codecList != null) {
-          synchronized (codecList) {
-            if (!codecList.isEmpty()) {
-              codec = codecList.remove(codecList.size() - 1);
-            }
-          }
-        }
-      }
-    }
-
-    return codec;
-  }
-
-  private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) {
-    if (codec != null) {
-      Class<T> codecClass = (Class<T>) codec.getClass();
-      synchronized (pool) {
-        if (!pool.containsKey(codecClass)) {
-          pool.put(codecClass, new ArrayList<T>());
-        }
-
-        List<T> codecList = pool.get(codecClass);
-        synchronized (codecList) {
-          codecList.add(codec);
-        }
-      }
-    }
-  }
-
-  /**
-   * Get a {@link Compressor} for the given {@link CompressionCodec} from the
-   * pool or a new one.
-   *
-   * @param codec
-   *          the <code>CompressionCodec</code> for which to get the
-   *          <code>Compressor</code>
-   * @return <code>Compressor</code> for the given <code>CompressionCodec</code>
-   *         from the pool or a new one
-   */
-  public static Compressor getCompressor(CompressionCodec codec) {
-    Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType());
-    if (compressor == null) {
-      compressor = codec.createCompressor();
-      LOG.info("Got brand-new compressor");
-    } else {
-      LOG.debug("Got recycled compressor");
-    }
-    return compressor;
-  }
-
-  /**
-   * Get a {@link Decompressor} for the given {@link CompressionCodec} from the
-   * pool or a new one.
-   *
-   * @param codec
-   *          the <code>CompressionCodec</code> for which to get the
-   *          <code>Decompressor</code>
-   * @return <code>Decompressor</code> for the given
-   *         <code>CompressionCodec</code> the pool or a new one
-   */
-  public static Decompressor getDecompressor(CompressionCodec codec) {
-    Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec
-        .getDecompressorType());
-    if (decompressor == null) {
-      decompressor = codec.createDecompressor();
-      LOG.info("Got brand-new decompressor");
-    } else {
-      LOG.debug("Got recycled decompressor");
-    }
-    return decompressor;
-  }
-
-  /**
-   * Return the {@link Compressor} to the pool.
-   *
-   * @param compressor
-   *          the <code>Compressor</code> to be returned to the pool
-   */
-  public static void returnCompressor(Compressor compressor) {
-    if (compressor == null) {
-      return;
-    }
-    compressor.reset();
-    payback(COMPRESSOR_POOL, compressor);
-  }
-
-  /**
-   * Return the {@link Decompressor} to the pool.
-   * 
-   * @param decompressor
-   *          the <code>Decompressor</code> to be returned to the pool
-   */
-  public static void returnDecompressor(Decompressor decompressor) {
-    if (decompressor == null) {
-      return;
-    }
-    decompressor.reset();
-    payback(DECOMPRESSOR_POOL, decompressor);
-  }
-
-  private CodecPool() {
-    // prevent instantiation
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 7920dfa..245aba5 100644
--- a/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-core/tajo-core-storage/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -425,7 +425,7 @@ public class RCFile {
           - skipped];
       this.codec = codec;
       if (codec != null) {
-        valDecompressor = CodecPool.getDecompressor(codec);
+        valDecompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
         deflatFilter = codec.createInputStream(decompressBuffer,
             valDecompressor);
       }
@@ -525,7 +525,7 @@ public class RCFile {
       }
       if (codec != null) {
         IOUtils.closeStream(decompressBuffer);
-        CodecPool.returnDecompressor(valDecompressor);
+        org.apache.tajo.storage.compress.CodecPool.returnDecompressor(valDecompressor);
       }
     }
 
@@ -894,7 +894,7 @@ public class RCFile {
       int valueLength = 0;
       if (isCompressed) {
         ReflectionUtils.setConf(codec, this.conf);
-        compressor = CodecPool.getCompressor(codec);
+        compressor = org.apache.tajo.storage.compress.CodecPool.getCompressor(codec);
         valueBuffer = new NonSyncDataOutputBuffer();
         deflateFilter = codec.createOutputStream(valueBuffer, compressor);
         deflateOut = new DataOutputStream(deflateFilter);
@@ -935,7 +935,7 @@ public class RCFile {
         throw new IOException("negative length keys not allowed: " + key);
       }
       if (compressor != null) {
-        CodecPool.returnCompressor(compressor);
+        org.apache.tajo.storage.compress.CodecPool.returnCompressor(compressor);
       }
 
       // Write the key out
@@ -975,7 +975,7 @@ public class RCFile {
       out.writeInt(keyLength); // key portion length
 
       if(this.isCompressed()) {
-        Compressor compressor = CodecPool.getCompressor(codec);
+        Compressor compressor = org.apache.tajo.storage.compress.CodecPool.getCompressor(codec);
         NonSyncDataOutputBuffer compressionBuffer =
             new NonSyncDataOutputBuffer();
         CompressionOutputStream deflateFilter =
@@ -1267,7 +1267,7 @@ public class RCFile {
           throw new IllegalArgumentException(
               "Unknown codec: " + codecClassname, cnfe);
         }
-        keyDecompressor = CodecPool.getDecompressor(codec);
+        keyDecompressor = org.apache.tajo.storage.compress.CodecPool.getDecompressor(codec);
       }
 
       metadata = new Metadata();
@@ -1732,7 +1732,7 @@ public class RCFile {
       currentValue.close();
       if (decompress) {
         IOUtils.closeStream(keyDecompressedData);
-        CodecPool.returnDecompressor(keyDecompressor);
+        org.apache.tajo.storage.compress.CodecPool.returnDecompressor(keyDecompressor);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
new file mode 100644
index 0000000..42c68b6
--- /dev/null
+++ b/tajo-core/tajo-core-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.*;
+import org.apache.hadoop.util.NativeCodeLoader;
+import org.apache.tajo.catalog.CatalogUtil;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStat;
+import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.datum.DatumFactory;
+import org.apache.tajo.util.CommonTestingUtil;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class TestCompressionStorages {
+  private TajoConf conf;
+  private static String TEST_PATH = "target/test-data/TestCompressionStorages";
+
+  private StoreType storeType;
+  private Path testDir;
+  private FileSystem fs;
+
+  public TestCompressionStorages(StoreType type) throws IOException {
+    this.storeType = type;
+    conf = new TajoConf();
+
+    testDir = CommonTestingUtil.getTestDir(TEST_PATH);
+    fs = testDir.getFileSystem(conf);
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> generateParameters() {
+    return Arrays.asList(new Object[][]{
+        {StoreType.CSV}
+    });
+  }
+
+  @Test
+  public void testDeflateCodecCompressionData() throws IOException {
+    storageCompressionTest(storeType, DeflateCodec.class);
+  }
+
+  @Test
+  public void testGzipCodecCompressionData() throws IOException {
+    storageCompressionTest(storeType, GzipCodec.class);
+  }
+
+  @Test
+  public void testSnappyCodecCompressionData() throws IOException {
+    if (SnappyCodec.isNativeCodeLoaded()) {
+      storageCompressionTest(storeType, SnappyCodec.class);
+    }
+  }
+
+  @Test
+  public void testBzip2CodecCompressionData() throws IOException {
+    storageCompressionTest(storeType, BZip2Codec.class);
+  }
+
+  @Test
+  public void testLz4CodecCompressionData() throws IOException {
+    if(NativeCodeLoader.isNativeCodeLoaded() && Lz4Codec.isNativeCodeLoaded())
+    storageCompressionTest(storeType, Lz4Codec.class);
+  }
+
+  @Test
+  public void testSplitCompressionData() throws IOException {
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+
+    TableMeta meta = CatalogUtil.newTableMeta(schema, StoreType.CSV);
+    meta.putOption("compression.codec", BZip2Codec.class.getCanonicalName());
+
+    Path tablePath = new Path(testDir, "SplitCompression");
+    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    appender.enableStats();
+
+    appender.init();
+
+    String extention = "";
+    if (appender instanceof CSVFile.CSVAppender) {
+      extention = ((CSVFile.CSVAppender) appender).getExtension();
+    }
+
+    int tupleNum = 100000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(2);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    TableStat stat = appender.getStats();
+    assertEquals(tupleNum, stat.getNumRows().longValue());
+    tablePath = tablePath.suffix(extention);
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    long randomNum = (long) (Math.random() * fileLen) + 1;
+
+    Fragment[] tablets = new Fragment[2];
+    tablets[0] = new Fragment("SplitCompression", tablePath, meta, 0, randomNum);
+    tablets[1] = new Fragment("SplitCompression", tablePath, meta, randomNum, (fileLen - randomNum));
+
+    Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema);
+    scanner.init();
+    int tupleCnt = 0;
+    Tuple tuple;
+    while ((tuple = scanner.next()) != null) {
+      tupleCnt++;
+    }
+    scanner.close();
+
+    scanner = StorageManager.getScanner(conf, meta, tablets[1], schema);
+    scanner.init();
+    while ((tuple = scanner.next()) != null) {
+      tupleCnt++;
+    }
+
+    scanner.close();
+    assertEquals(tupleNum, tupleCnt);
+  }
+
+  private void storageCompressionTest(StoreType storeType, Class<? extends CompressionCodec> codec) throws IOException {
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+
+    TableMeta meta = CatalogUtil.newTableMeta(schema, storeType);
+    meta.putOption("compression.codec", codec.getCanonicalName());
+
+    String fileName = "Compression_" + codec.getSimpleName();
+    Path tablePath = new Path(testDir, fileName);
+    Appender appender = StorageManager.getAppender(conf, meta, tablePath);
+    appender.enableStats();
+
+    appender.init();
+
+    String extension = "";
+    if (appender instanceof CSVFile.CSVAppender) {
+      extension = ((CSVFile.CSVAppender) appender).getExtension();
+    }
+
+    int tupleNum = 10000;
+    VTuple vTuple;
+
+    for (int i = 0; i < tupleNum; i++) {
+      vTuple = new VTuple(2);
+      vTuple.put(0, DatumFactory.createInt4(i + 1));
+      vTuple.put(1, DatumFactory.createInt8(25l));
+      appender.addTuple(vTuple);
+    }
+    appender.close();
+
+    TableStat stat = appender.getStats();
+    assertEquals(tupleNum, stat.getNumRows().longValue());
+    tablePath = tablePath.suffix(extension);
+    FileStatus status = fs.getFileStatus(tablePath);
+    long fileLen = status.getLen();
+    Fragment[] tablets = new Fragment[1];
+    tablets[0] = new Fragment(fileName, tablePath, meta, 0, fileLen);
+
+    Scanner scanner = StorageManager.getScanner(conf, meta, tablets[0], schema);
+    scanner.init();
+    int tupleCnt = 0;
+    Tuple tuple;
+    while ((tuple = scanner.next()) != null) {
+      tupleCnt++;
+    }
+    scanner.close();
+    assertEquals(tupleCnt, tupleNum);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tajo/blob/bc8e1804/tajo-dist/src/main/bin/tajo
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/bin/tajo b/tajo-dist/src/main/bin/tajo
index 5d5774e..baf7c7b 100755
--- a/tajo-dist/src/main/bin/tajo
+++ b/tajo-dist/src/main/bin/tajo
@@ -243,6 +243,7 @@ export HADOOP_EXT_CLASSPATH
 # Append $HADOOP_JAR_CLASSPATH to $CLASSPATH
 CLASSPATH="${CLASSPATH}:${HADOOP_EXT_CLASSPATH}"
 
+HDFS_LIBRARY_PATH="${HADOOP_HOME}/lib/native/"
 ##############################################################################
 # Hadoop Home Configuration End
 ##############################################################################
@@ -320,6 +321,12 @@ if [ -d "${TAJO_HOME}/build/native" -o -d "${TAJO_HOME}/lib/native" ]; then
   fi
 fi
 
+if [ "x$JAVA_LIBRARY_PATH" != "x" ]; then
+    JAVA_LIBRARY_PATH=${JAVA_LIBRARY_PATH}:${HDFS_LIBRARY_PATH}
+  else
+    JAVA_LIBRARY_PATH=${HDFS_LIBRARY_PATH}
+fi
+
 # cygwin path translation
 if $cygwin; then
   JAVA_LIBRARY_PATH=`cygpath -p "$JAVA_LIBRARY_PATH"`


Mime
View raw message