tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jihoon...@apache.org
Subject tajo git commit: TAJO-1975: Gathering fine-grained column statistics for range shuffle.
Date Fri, 13 Nov 2015 04:37:52 GMT
Repository: tajo
Updated Branches:
  refs/heads/master ed6603792 -> 011fcd922


TAJO-1975: Gathering fine-grained column statistics for range shuffle.

Closes #859


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/011fcd92
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/011fcd92
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/011fcd92

Branch: refs/heads/master
Commit: 011fcd922d0a809e8d6d88de441594fd13f649a0
Parents: ed66037
Author: Jihoon Son <jihoonson@apache.org>
Authored: Fri Nov 13 13:37:39 2015 +0900
Committer: Jihoon Son <jihoonson@apache.org>
Committed: Fri Nov 13 13:37:39 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../physical/HashShuffleFileWriteExec.java      |  12 +-
 .../physical/RangeShuffleFileWriteExec.java     |   2 +-
 .../java/org/apache/tajo/storage/Appender.java  |  56 ++++-
 .../apache/tajo/storage/TableStatistics.java    |  74 +++----
 .../storage/hbase/AbstractHBaseAppender.java    |  23 +-
 .../tajo/storage/hbase/HBasePutAppender.java    |   4 +-
 .../tajo/storage/hbase/HFileAppender.java       |   6 +-
 .../org/apache/tajo/storage/FileAppender.java   |  26 ++-
 .../tajo/storage/HashShuffleAppender.java       | 209 -------------------
 .../storage/HashShuffleAppenderManager.java     |  14 +-
 .../storage/HashShuffleAppenderWrapper.java     | 187 +++++++++++++++++
 .../java/org/apache/tajo/storage/RawFile.java   |  20 +-
 .../java/org/apache/tajo/storage/RowFile.java   |  10 +-
 .../apache/tajo/storage/avro/AvroAppender.java  |  27 ++-
 .../apache/tajo/storage/orc/ORCAppender.java    |  20 +-
 .../tajo/storage/parquet/ParquetAppender.java   |  35 ++--
 .../storage/rawfile/DirectRawFileWriter.java    |  10 +-
 .../org/apache/tajo/storage/rcfile/RCFile.java  |  27 +--
 .../sequencefile/SequenceFileAppender.java      |  10 +-
 .../tajo/storage/text/DelimitedTextFile.java    |  10 +-
 .../tajo/storage/TestCompressionStorages.java   |   3 +-
 22 files changed, 405 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index c36ced1..98a780e 100644
--- a/CHANGES
+++ b/CHANGES
@@ -61,6 +61,8 @@ Release 0.12.0 - unreleased
 
   TASKS
 
+    TAJO-1975: Gathering fine-grained column statistics for range shuffle. (jihoon)
+
     TAJO-1963: Add more configuration descriptions to document. (jihoon)
 
     TAJO-1970: Change the null first syntax. (jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
index a72a375..35a204a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java
@@ -27,7 +27,7 @@ import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf.ConfVars;
 import org.apache.tajo.plan.logical.ShuffleFileWriteNode;
-import org.apache.tajo.storage.HashShuffleAppender;
+import org.apache.tajo.storage.HashShuffleAppenderWrapper;
 import org.apache.tajo.storage.HashShuffleAppenderManager;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.worker.TaskAttemptContext;
@@ -45,7 +45,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
   private ShuffleFileWriteNode plan;
   private final TableMeta meta;
   private Partitioner partitioner;
-  private Map<Integer, HashShuffleAppender> appenderMap = new HashMap<>();
+  private Map<Integer, HashShuffleAppenderWrapper> appenderMap = new HashMap<>();
   private final int numShuffleOutputs;
   private final int [] shuffleKeyIds;
   private HashShuffleAppenderManager hashShuffleAppenderManager;
@@ -79,8 +79,8 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
     super.init();
   }
   
-  private HashShuffleAppender getAppender(int partId) throws IOException {
-    HashShuffleAppender appender = appenderMap.get(partId);
+  private HashShuffleAppenderWrapper getAppender(int partId) throws IOException {
+    HashShuffleAppenderWrapper appender = appenderMap.get(partId);
     if (appender == null) {
       appender = hashShuffleAppenderManager.getAppender(context.getConf(),
           context.getTaskId().getTaskId().getExecutionBlockId(), partId, meta, outSchema);
@@ -113,7 +113,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
         if (tupleCount >= numHashShuffleBufferTuples) {
           for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) {
             int appendPartId = entry.getKey();
-            HashShuffleAppender appender = getAppender(appendPartId);
+            HashShuffleAppenderWrapper appender = getAppender(appendPartId);
             int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue());
             writtenBytes += appendedSize;
             entry.getValue().clear();
@@ -125,7 +125,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec {
       // processing remained tuples
       for (Map.Entry<Integer, TupleList> entry : partitionTuples.entrySet()) {
         int appendPartId = entry.getKey();
-        HashShuffleAppender appender = getAppender(appendPartId);
+        HashShuffleAppenderWrapper appender = getAppender(appendPartId);
         int appendedSize = appender.addTuples(context.getTaskId(), entry.getValue());
         writtenBytes += appendedSize;
         entry.getValue().clear();

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
index 4d01b00..bcd2b17 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java
@@ -83,7 +83,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec {
     fs.mkdirs(storeTablePath);
     this.appender = (FileAppender) ((FileTablespace) TablespaceManager.getDefault())
         .getAppender(meta, outSchema, new Path(storeTablePath, "output"));
-    this.appender.enableStats();
+    this.appender.enableStats(keySchema.getAllColumns());
     this.appender.init();
     this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"),
         BSTIndex.TWO_LEVEL_INDEX, keySchema, comp);

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java
index c5e96ac..9e11799 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/Appender.java
@@ -18,24 +18,74 @@
 
 package org.apache.tajo.storage;
 
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.statistics.TableStats;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 
+/**
+ *
+ * Interface for appender.
+ * Every appender that writes some data to underlying storage needs to implement this interface.
+ */
 public interface Appender extends Closeable {
 
+  /**
+   * Initialize the appender.
+   *
+   * @throws IOException
+   */
   void init() throws IOException;
 
+  /**
+   * Write the given tuple.
+   *
+   * @param t
+   * @throws IOException
+   */
   void addTuple(Tuple t) throws IOException;
-  
+
+  /**
+   * Flush buffered tuples if they exist.
+   *
+   * @throws IOException
+   */
   void flush() throws IOException;
 
+  /**
+   * The total size of written output.
+   * The result value can be different from the real size due to the buffered data.
+   *
+   * @return
+   * @throws IOException
+   */
   long getEstimatedOutputSize() throws IOException;
-  
+
+  /**
+   * Close the appender.
+   *
+   * @throws IOException
+   */
   void close() throws IOException;
 
+  /**
+   * Enable statistics collection for the output table.
+   */
   void enableStats();
-  
+
+  /**
+   * Enable statistics collection for the output table as well as its columns.
+   * Note that statistics are collected on only the specified columns.
+   * @param columnList a list of columns on which statistics is collected
+   */
+  void enableStats(List<Column> columnList);
+
+  /**
+   * Return collected statistics.
+   *
+   * @return statistics
+   */
   TableStats getStats();
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
index aa33ea3..31e07ee 100644
--- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
+++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java
@@ -20,11 +20,11 @@ package org.apache.tajo.storage;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.TajoConstants;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.statistics.ColumnStats;
 import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.common.TajoDataTypes.DataType;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.datum.Datum;
 
@@ -33,30 +33,26 @@ import org.apache.tajo.datum.Datum;
  */
 public class TableStatistics {
   private static final Log LOG = LogFactory.getLog(TableStatistics.class);
-  private Schema schema;
-  private VTuple minValues;
-  private VTuple maxValues;
-  private long [] numNulls;
+  private final Schema schema;
+  private final VTuple minValues;
+  private final VTuple maxValues;
+  private final long [] numNulls;
   private long numRows = 0;
-  private long numBytes = 0;
+  private long numBytes = TajoConstants.UNKNOWN_LENGTH;
 
-  private boolean [] comparable;
+  private final boolean[] columnStatsEnabled;
 
-  public TableStatistics(Schema schema) {
+  public TableStatistics(Schema schema, boolean[] columnStatsEnabled) {
     this.schema = schema;
     minValues = new VTuple(schema.size());
     maxValues = new VTuple(schema.size());
 
     numNulls = new long[schema.size()];
-    comparable = new boolean[schema.size()];
 
-    DataType type;
+    this.columnStatsEnabled = columnStatsEnabled;
     for (int i = 0; i < schema.size(); i++) {
-      type = schema.getColumn(i).getDataType();
-      if (type.getType() == Type.PROTOBUF) {
-        comparable[i] = false;
-      } else {
-        comparable[i] = true;
+      if (schema.getColumn(i).getDataType().getType().equals(Type.PROTOBUF)) {
+        columnStatsEnabled[i] = false;
       }
     }
   }
@@ -85,18 +81,14 @@ public class TableStatistics {
     return this.numBytes;
   }
 
-  public void analyzeNull(int idx) {
-    numNulls[idx]++;
-  }
-
   public void analyzeField(int idx, Tuple tuple) {
-    if (tuple.isBlankOrNull(idx)) {
-      numNulls[idx]++;
-      return;
-    }
+    if (columnStatsEnabled[idx]) {
+      if (tuple.isBlankOrNull(idx)) {
+        numNulls[idx]++;
+        return;
+      }
 
-    Datum datum = tuple.asDatum(idx);
-    if (comparable[idx]) {
+      Datum datum = tuple.asDatum(idx);
       if (!maxValues.contains(idx) ||
           maxValues.get(idx).compareTo(datum) < 0) {
         maxValues.put(idx, datum);
@@ -112,22 +104,24 @@ public class TableStatistics {
     TableStats stat = new TableStats();
 
     for (int i = 0; i < schema.size(); i++) {
-      Column column = schema.getColumn(i);
-      ColumnStats columnStats = new ColumnStats(column);
-      columnStats.setNumNulls(numNulls[i]);
-      if (minValues.isBlank(i) || column.getDataType().getType() == minValues.type(i)) {
-        columnStats.setMinValue(minValues.get(i));
-      } else {
-        LOG.warn("Wrong statistics column type (" + minValues.type(i) +
-            ", expected=" + column.getDataType().getType() + ")");
-      }
-      if (minValues.isBlank(i) || column.getDataType().getType() == maxValues.type(i)) {
-        columnStats.setMaxValue(maxValues.get(i));
-      } else {
-        LOG.warn("Wrong statistics column type (" + maxValues.type(i) +
-            ", expected=" + column.getDataType().getType() + ")");
+      if (columnStatsEnabled[i]) {
+        Column column = schema.getColumn(i);
+        ColumnStats columnStats = new ColumnStats(column);
+        columnStats.setNumNulls(numNulls[i]);
+        if (minValues.isBlank(i) || column.getDataType().getType() == minValues.type(i)) {
+          columnStats.setMinValue(minValues.get(i));
+        } else {
+          LOG.warn("Wrong statistics column type (" + minValues.type(i) +
+              ", expected=" + column.getDataType().getType() + ")");
+        }
+        if (minValues.isBlank(i) || column.getDataType().getType() == maxValues.type(i)) {
+          columnStats.setMaxValue(maxValues.get(i));
+        } else {
+          LOG.warn("Wrong statistics column type (" + maxValues.type(i) +
+              ", expected=" + column.getDataType().getType() + ")");
+        }
+        stat.addColumnStat(columnStats);
       }
-      stat.addColumnStat(columnStats);
     }
 
     stat.setNumRows(this.numRows);

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
index b5f39a0..4289026 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
@@ -53,7 +54,8 @@ public abstract class AbstractHBaseAppender implements Appender {
 
   protected ColumnMapping columnMapping;
   protected TableStatistics stats;
-  protected boolean enabledStats;
+  protected boolean tableStatsEnabled;
+  protected boolean[] columnStatsEnabled;
 
   protected int columnNum;
 
@@ -88,8 +90,8 @@ public abstract class AbstractHBaseAppender implements Appender {
       throw new IllegalStateException("FileAppender is already initialized.");
     }
     inited = true;
-    if (enabledStats) {
-      stats = new TableStatistics(this.schema);
+    if (tableStatsEnabled) {
+      stats = new TableStatistics(this.schema, columnStatsEnabled);
     }
     try {
       columnMapping = new ColumnMapping(schema, meta.getOptions());
@@ -210,12 +212,23 @@ public abstract class AbstractHBaseAppender implements Appender {
 
   @Override
   public void enableStats() {
-    enabledStats = true;
+    if (inited) {
+      throw new IllegalStateException("Should enable this option before init()");
+    }
+
+    this.tableStatsEnabled = true;
+    this.columnStatsEnabled = new boolean[schema.size()];
+  }
+
+  @Override
+  public void enableStats(List<Column> columnList) {
+    enableStats();
+    columnList.forEach(column -> columnStatsEnabled[schema.getIndex(column)] = true);
   }
 
   @Override
   public TableStats getStats() {
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       return stats.getTableStat();
     } else {
       return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
index 20b1a08..337c062 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java
@@ -89,7 +89,7 @@ public class HBasePutAppender extends AbstractHBaseAppender {
 
     htable.put(put);
 
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.incrementRow();
       stats.setNumBytes(totalNumBytes);
     }
@@ -111,7 +111,7 @@ public class HBasePutAppender extends AbstractHBaseAppender {
       htable.flushCommits();
       htable.close();
     }
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.setNumBytes(totalNumBytes);
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
index 42f25cc..228d5a4 100644
--- a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
+++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java
@@ -104,7 +104,7 @@ public class HFileAppender extends AbstractHBaseAppender {
         }
         kvSet.clear();
         // Statistical section
-        if (enabledStats) {
+        if (tableStatsEnabled) {
           stats.incrementRow();
         }
       } catch (InterruptedException e) {
@@ -145,7 +145,7 @@ public class HFileAppender extends AbstractHBaseAppender {
         }
         kvSet.clear();
         // Statistical section
-        if (enabledStats) {
+        if (tableStatsEnabled) {
           stats.incrementRow();
         }
       } catch (InterruptedException e) {
@@ -153,7 +153,7 @@ public class HFileAppender extends AbstractHBaseAppender {
       }
     }
 
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.setNumBytes(totalNumBytes);
     }
     if (writer != null) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
index c6a690b..568df8c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -18,20 +18,19 @@
 
 package org.apache.tajo.storage;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.conf.TajoConf;
+import org.apache.tajo.exception.NotImplementedException;
 
 import java.io.IOException;
+import java.util.List;
 
 public abstract class FileAppender implements Appender {
-  private static final Log LOG = LogFactory.getLog(FileAppender.class);
-
   protected boolean inited = false;
 
   protected final Configuration conf;
@@ -40,7 +39,8 @@ public abstract class FileAppender implements Appender {
   protected final Path workDir;
   protected final TaskAttemptId taskAttemptId;
 
-  protected boolean enabledStats;
+  protected boolean tableStatsEnabled;
+  protected boolean[] columnStatsEnabled;
   protected Path path;
 
   public FileAppender(Configuration conf, TaskAttemptId taskAttemptId, Schema schema,
@@ -65,6 +65,7 @@ public abstract class FileAppender implements Appender {
     }
   }
 
+  @Override
   public void init() throws IOException {
     if (inited) {
      throw new IllegalStateException("FileAppender is already initialized.");
@@ -72,17 +73,28 @@ public abstract class FileAppender implements Appender {
     inited = true;
   }
 
+  @Override
   public void enableStats() {
     if (inited) {
       throw new IllegalStateException("Should enable this option before init()");
     }
 
-    this.enabledStats = true;
+    this.tableStatsEnabled = true;
+    this.columnStatsEnabled = new boolean[schema.size()];
+  }
+
+  @Override
+  public void enableStats(List<Column> columnList) {
+    enableStats();
+    columnList.forEach(column -> columnStatsEnabled[schema.getIndex(column)] = true);
   }
 
+  @Override
   public long getEstimatedOutputSize() throws IOException {
     return getOffset();
   }
 
-  public abstract long getOffset() throws IOException;
+  public long getOffset() throws IOException {
+    throw new IOException(new NotImplementedException());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
deleted file mode 100644
index a82c7ec..0000000
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tajo.ExecutionBlockId;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.catalog.statistics.TableStats;
-import org.apache.tajo.util.Pair;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class HashShuffleAppender implements Appender {
-  private static Log LOG = LogFactory.getLog(HashShuffleAppender.class);
-
-  private FileAppender appender;
-  private AtomicBoolean closed = new AtomicBoolean(false);
-  private int partId;
-
-  private TableStats tableStats;
-
-  //<taskId,<page start offset,<task start, task end>>>
-  private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
-
-  //page start offset, length
-  private List<Pair<Long, Integer>> pages = new ArrayList<>();
-
-  private Pair<Long, Integer> currentPage;
-
-  private int pageSize; //MB
-
-  private int rowNumInPage;
-
-  private int totalRows;
-
-  private long offset;
-
-  private ExecutionBlockId ebId;
-
-  public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) {
-    this.ebId = ebId;
-    this.partId = partId;
-    this.appender = appender;
-    this.pageSize = pageSize;
-  }
-
-  @Override
-  public void init() throws IOException {
-    currentPage = new Pair(0L, 0);
-    taskTupleIndexes = new HashMap<>();
-    rowNumInPage = 0;
-  }
-
-  /**
-   * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition.
-   * After writing if a current page exceeds pageSize, pageOffset will be added.
-   * @param taskId
-   * @param tuples
-   * @return written bytes
-   * @throws java.io.IOException
-   */
-  public int addTuples(TaskAttemptId taskId, List<Tuple> tuples) throws IOException {
-    synchronized(appender) {
-      if (closed.get()) {
-        return 0;
-      }
-      long currentPos = appender.getOffset();
-
-      for (Tuple eachTuple: tuples) {
-        appender.addTuple(eachTuple);
-      }
-      long posAfterWritten = appender.getOffset();
-
-      int writtenBytes = (int)(posAfterWritten - currentPos);
-
-      int nextRowNum = rowNumInPage + tuples.size();
-      List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
-      if (taskIndexes == null) {
-        taskIndexes = new ArrayList<>();
-        taskTupleIndexes.put(taskId, taskIndexes);
-      }
-      taskIndexes.add(
-              new Pair<>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum)));
-      rowNumInPage = nextRowNum;
-
-      if (posAfterWritten - currentPage.getFirst() > pageSize) {
-        nextPage(posAfterWritten);
-        rowNumInPage = 0;
-      }
-
-      totalRows += tuples.size();
-      return writtenBytes;
-    }
-  }
-
-  public long getOffset() throws IOException {
-    if (closed.get()) {
-      return offset;
-    } else {
-      return appender.getOffset();
-    }
-  }
-
-  private void nextPage(long pos) {
-    currentPage.setSecond((int) (pos - currentPage.getFirst()));
-    pages.add(currentPage);
-    currentPage = new Pair(pos, 0);
-  }
-
-  @Override
-  public void addTuple(Tuple t) throws IOException {
-    throw new IOException("Not support addTuple, use addTuples()");
-  }
-
-  @Override
-  public void flush() throws IOException {
-    synchronized(appender) {
-      if (closed.get()) {
-        return;
-      }
-      appender.flush();
-    }
-  }
-
-  @Override
-  public long getEstimatedOutputSize() throws IOException {
-    return pageSize * pages.size();
-  }
-
-  @Override
-  public void close() throws IOException {
-    synchronized(appender) {
-      if (closed.get()) {
-        return;
-      }
-      appender.flush();
-      offset = appender.getOffset();
-      if (offset > currentPage.getFirst()) {
-        nextPage(offset);
-      }
-      appender.close();
-      if (LOG.isDebugEnabled()) {
-        if (!pages.isEmpty()) {
-          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()
-              + ", lastPage=" + pages.get(pages.size() - 1));
-        } else {
-          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size());
-        }
-      }
-      closed.set(true);
-      tableStats = appender.getStats();
-    }
-  }
-
-  @Override
-  public void enableStats() {
-  }
-
-  @Override
-  public TableStats getStats() {
-    synchronized(appender) {
-      return appender.getStats();
-    }
-  }
-
-  public List<Pair<Long, Integer>> getPages() {
-    return pages;
-  }
-
-  public Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() {
-    return taskTupleIndexes;
-  }
-
-  public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
-    List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<>();
-
-    for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) {
-      merged.addAll(eachFailureIndex);
-    }
-
-    return merged;
-  }
-
-  public void taskFinished(TaskAttemptId taskId) {
-    taskTupleIndexes.remove(taskId);
-  }
-}

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
index 4297e4d..62df9a3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java
@@ -62,8 +62,8 @@ public class HashShuffleAppenderManager {
     pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024;
   }
 
-  public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
-                              TableMeta meta, Schema outSchema) throws IOException {
+  public HashShuffleAppenderWrapper getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId,
+                                                TableMeta meta, Schema outSchema) throws IOException {
     synchronized (appenderMap) {
       Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId);
 
@@ -93,7 +93,7 @@ public class HashShuffleAppenderManager {
         partitionAppenderMeta = new PartitionAppenderMeta();
         partitionAppenderMeta.partId = partId;
         partitionAppenderMeta.dataFile = dataFile;
-        partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender);
+        partitionAppenderMeta.appender = new HashShuffleAppenderWrapper(ebId, partId, pageSize, appender);
         partitionAppenderMeta.appender.init();
         partitionAppenderMap.put(partId, partitionAppenderMeta);
 
@@ -132,7 +132,7 @@ public class HashShuffleAppenderManager {
     }
 
     if (partitionAppenderMap == null) {
-      LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle");
+      LOG.info("Close HashShuffleAppenderWrapper:" + ebId + ", not a hash shuffle");
       return null;
     }
 
@@ -152,7 +152,7 @@ public class HashShuffleAppenderManager {
       }
     }
 
-    LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermediateEntries.size());
+    LOG.info("Close HashShuffleAppenderWrapper:" + ebId + ", intermediates=" + intermediateEntries.size());
 
     return intermediateEntries;
   }
@@ -210,14 +210,14 @@ public class HashShuffleAppenderManager {
 
   static class PartitionAppenderMeta {
     int partId;
-    HashShuffleAppender appender;
+    HashShuffleAppenderWrapper appender;
     Path dataFile;
 
     public int getPartId() {
       return partId;
     }
 
-    public HashShuffleAppender getAppender() {
+    public HashShuffleAppenderWrapper getAppender() {
       return appender;
     }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java
new file mode 100644
index 0000000..bcd4388
--- /dev/null
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/HashShuffleAppenderWrapper.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.TaskAttemptId;
+import org.apache.tajo.catalog.statistics.TableStats;
+import org.apache.tajo.util.Pair;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class HashShuffleAppenderWrapper implements Closeable {
+  private static Log LOG = LogFactory.getLog(HashShuffleAppenderWrapper.class);
+
+  private FileAppender appender;
+  private AtomicBoolean closed = new AtomicBoolean(false);
+  private int partId;
+
+  //<taskId,<page start offset,<task start, task end>>>
+  private Map<TaskAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes;
+
+  //page start offset, length
+  private List<Pair<Long, Integer>> pages = new ArrayList<>();
+
+  private Pair<Long, Integer> currentPage;
+
+  private int pageSize; //MB
+
+  private int rowNumInPage;
+
+  private long offset;
+
+  private ExecutionBlockId ebId;
+
+  public HashShuffleAppenderWrapper(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) {
+    this.ebId = ebId;
+    this.partId = partId;
+    this.appender = appender;
+    this.pageSize = pageSize;
+  }
+
+  public void init() throws IOException {
+    currentPage = new Pair(0L, 0);
+    taskTupleIndexes = new HashMap<>();
+    rowNumInPage = 0;
+  }
+
+  /**
+   * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition.
+   * After writing if a current page exceeds pageSize, pageOffset will be added.
+   * @param taskId
+   * @param tuples
+   * @return written bytes
+   * @throws java.io.IOException
+   */
+  public int addTuples(TaskAttemptId taskId, List<Tuple> tuples) throws IOException {
+    synchronized(appender) {
+      if (closed.get()) {
+        return 0;
+      }
+      long currentPos = appender.getOffset();
+
+      for (Tuple eachTuple: tuples) {
+        appender.addTuple(eachTuple);
+      }
+      long posAfterWritten = appender.getOffset();
+
+      int writtenBytes = (int)(posAfterWritten - currentPos);
+
+      int nextRowNum = rowNumInPage + tuples.size();
+      List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId);
+      if (taskIndexes == null) {
+        taskIndexes = new ArrayList<>();
+        taskTupleIndexes.put(taskId, taskIndexes);
+      }
+      taskIndexes.add(
+              new Pair<>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum)));
+      rowNumInPage = nextRowNum;
+
+      if (posAfterWritten - currentPage.getFirst() > pageSize) {
+        nextPage(posAfterWritten);
+        rowNumInPage = 0;
+      }
+
+      return writtenBytes;
+    }
+  }
+
+  public long getOffset() throws IOException {
+    if (closed.get()) {
+      return offset;
+    } else {
+      return appender.getOffset();
+    }
+  }
+
+  private void nextPage(long pos) {
+    currentPage.setSecond((int) (pos - currentPage.getFirst()));
+    pages.add(currentPage);
+    currentPage = new Pair(pos, 0);
+  }
+
+  public void addTuple(Tuple t) throws IOException {
+    throw new IOException("Not support addTuple, use addTuples()");
+  }
+
+  public void flush() throws IOException {
+    synchronized(appender) {
+      if (closed.get()) {
+        return;
+      }
+      appender.flush();
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    synchronized(appender) {
+      if (closed.get()) {
+        return;
+      }
+      appender.flush();
+      offset = appender.getOffset();
+      if (offset > currentPage.getFirst()) {
+        nextPage(offset);
+      }
+      appender.close();
+      if (LOG.isDebugEnabled()) {
+        if (!pages.isEmpty()) {
+          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()
+              + ", lastPage=" + pages.get(pages.size() - 1));
+        } else {
+          LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size());
+        }
+      }
+      closed.set(true);
+    }
+  }
+
+  public TableStats getStats() {
+    synchronized(appender) {
+      return appender.getStats();
+    }
+  }
+
+  public List<Pair<Long, Integer>> getPages() {
+    return pages;
+  }
+
+  public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() {
+    List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<>();
+
+    for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) {
+      merged.addAll(eachFailureIndex);
+    }
+
+    return merged;
+  }
+
+  public void taskFinished(TaskAttemptId taskId) {
+    taskTupleIndexes.remove(taskId);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index a7b33fa..cda39f9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -35,8 +35,6 @@ import org.apache.tajo.datum.ProtobufDatumFactory;
 import org.apache.tajo.exception.TajoRuntimeException;
 import org.apache.tajo.exception.UnsupportedException;
 import org.apache.tajo.plan.expr.EvalNode;
-import org.apache.tajo.plan.serder.PlanProto.ShuffleType;
-import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.BitArray;
@@ -474,7 +472,6 @@ public class RawFile {
     private int headerSize = 0;
     private static final int RECORD_SIZE = 4;
     private long pos;
-    private ShuffleType shuffleType;
 
     private TableStatistics stats;
 
@@ -512,11 +509,8 @@ public class RawFile {
       nullFlags = new BitArray(schema.size());
       headerSize = RECORD_SIZE + 2 + nullFlags.bytesLength();
 
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
-        this.shuffleType = PlannerUtil.getShuffleType(
-            meta.getOption(StorageConstants.SHUFFLE_TYPE,
-                PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)));
+      if (tableStatsEnabled) {
+        this.stats = new TableStatistics(this.schema, columnStatsEnabled);
       }
 
       super.init();
@@ -646,8 +640,8 @@ public class RawFile {
       // reset the null flags
       nullFlags.clear();
       for (int i = 0; i < schema.size(); i++) {
-        if (shuffleType == ShuffleType.RANGE_SHUFFLE) {
-          // it is to calculate min/max values, and it is only used for the intermediate file.
+        // it is to calculate min/max values, and it is only used for the intermediate file.
+        if (tableStatsEnabled) {
           stats.analyzeField(i, t);
         }
 
@@ -751,7 +745,7 @@ public class RawFile {
       pos += bufferPos - recordOffset;
       buffer.position(bufferPos);
 
-      if (enabledStats) {
+      if (tableStatsEnabled) {
         stats.incrementRow();
       }
     }
@@ -766,7 +760,7 @@ public class RawFile {
     @Override
     public void close() throws IOException {
       flush();
-      if (enabledStats) {
+      if (tableStatsEnabled) {
         stats.setNumBytes(getOffset());
       }
       if (LOG.isDebugEnabled()) {
@@ -786,7 +780,7 @@ public class RawFile {
 
     @Override
     public TableStats getStats() {
-      if (enabledStats) {
+      if (tableStatsEnabled) {
         stats.setNumBytes(pos);
         return stats.getTableStat();
       } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
index 45206f5..87a112e 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RowFile.java
@@ -366,8 +366,8 @@ public class RowFile {
 
       nullFlags = new BitArray(schema.size());
 
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
+      if (tableStatsEnabled) {
+        this.stats = new TableStatistics(this.schema, columnStatsEnabled);
         this.shuffleType = PlannerUtil.getShuffleType(
             meta.getOption(StorageConstants.SHUFFLE_TYPE,
                 PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)));
@@ -462,7 +462,7 @@ public class RowFile {
       out.write(bytes, 0, dataLen);
 
       // Statistical section
-      if (enabledStats) {
+      if (tableStatsEnabled) {
         stats.incrementRow();
       }
     }
@@ -480,7 +480,7 @@ public class RowFile {
     @Override
     public void close() throws IOException {
       if (out != null) {
-        if (enabledStats) {
+        if (tableStatsEnabled) {
           stats.setNumBytes(out.getPos());
         }
         sync();
@@ -505,7 +505,7 @@ public class RowFile {
 
     @Override
     public TableStats getStats() {
-      if (enabledStats) {
+      if (tableStatsEnabled) {
         return stats.getTableStat();
       } else {
         return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
index e54fb80..29f4534 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java
@@ -81,23 +81,12 @@ public class AvroAppender extends FileAppender {
     dataFileWriter = new DataFileWriter<>(datumWriter);
     dataFileWriter.create(avroSchema, outputStream);
 
-    if (enabledStats) {
-      this.stats = new TableStatistics(schema);
+    if (tableStatsEnabled) {
+      this.stats = new TableStatistics(schema, columnStatsEnabled);
     }
     super.init();
   }
 
-  /**
-   * Gets the current offset. Tracking offsets is currently not implemented, so
-   * this method always returns 0.
-   *
-   * @return 0
-   */
-  @Override
-  public long getOffset() throws IOException {
-    return 0;
-  }
-
   private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) {
     if (tuple.isBlankOrNull(i)) {
       return null;
@@ -173,10 +162,13 @@ public class AvroAppender extends FileAppender {
           throw new RuntimeException("Unknown type: " + avroType);
       }
       record.put(i, value);
+      if (tableStatsEnabled) {
+        stats.analyzeField(i, tuple);
+      }
     }
     dataFileWriter.append(record);
 
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.incrementRow();
     }
   }
@@ -195,6 +187,11 @@ public class AvroAppender extends FileAppender {
   @Override
   public void close() throws IOException {
     IOUtils.cleanup(null, dataFileWriter);
+
+    // TODO: getOffset is not implemented yet
+//    if (tableStatsEnabled) {
+//      stats.setNumBytes(getOffset());
+//    }
   }
 
   /**
@@ -204,7 +201,7 @@ public class AvroAppender extends FileAppender {
    */
   @Override
   public TableStats getStats() {
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       return stats.getTableStat();
     } else {
       return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
index dbbf5a6..7d00206 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
@@ -62,27 +62,22 @@ public class ORCAppender extends FileAppender {
         StorageConstants.DEFAULT_ORC_ROW_INDEX_STRIDE)),
       timezone);
 
-    if (enabledStats) {
-      this.stats = new TableStatistics(schema);
+    if (tableStatsEnabled) {
+      this.stats = new TableStatistics(schema, columnStatsEnabled);
     }
 
     super.init();
   }
 
   @Override
-  public long getOffset() throws IOException {
-    return 0;
-  }
-
-  @Override
   public void addTuple(Tuple tuple) throws IOException {
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       for (int i = 0; i < schema.size(); ++i) {
         stats.analyzeField(i, tuple);
       }
     }
     writer.addTuple(tuple);
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.incrementRow();
     }
   }
@@ -94,11 +89,16 @@ public class ORCAppender extends FileAppender {
   @Override
   public void close() throws IOException {
     writer.close();
+
+    // TODO: getOffset is not implemented yet
+//    if (tableStatsEnabled) {
+//      stats.setNumBytes(getOffset());
+//    }
   }
 
   @Override
   public TableStats getStats() {
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       return stats.getTableStat();
     } else {
       return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index 41e4269..07c9d0e 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -18,20 +18,19 @@
 
 package org.apache.tajo.storage.parquet;
 
-import org.apache.hadoop.io.IOUtils;
-import org.apache.tajo.TaskAttemptId;
-import org.apache.tajo.storage.StorageConstants;
-import parquet.hadoop.ParquetOutputFormat;
-import parquet.hadoop.metadata.CompressionCodecName;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tajo.TaskAttemptId;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.storage.FileAppender;
+import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.storage.TableStatistics;
 import org.apache.tajo.storage.Tuple;
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.metadata.CompressionCodecName;
 
 import java.io.IOException;
 
@@ -82,24 +81,13 @@ public class ParquetAppender extends FileAppender {
                                    pageSize,
                                    enableDictionary,
                                    validating);
-    if (enabledStats) {
-      this.stats = new TableStatistics(schema);
+    if (tableStatsEnabled) {
+      this.stats = new TableStatistics(schema, columnStatsEnabled);
     }
     super.init();
   }
 
   /**
-   * Gets the current offset. Tracking offsets is currently not implemented, so
-   * this method always returns 0.
-   *
-   * @return 0
-   */
-  @Override
-  public long getOffset() throws IOException {
-    return 0;
-  }
-
-  /**
    * Write a Tuple to the Parquet file.
    *
    * @param tuple The Tuple to write.
@@ -107,7 +95,7 @@ public class ParquetAppender extends FileAppender {
   @Override
   public void addTuple(Tuple tuple) throws IOException {
     writer.write(tuple);
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.incrementRow();
     }
   }
@@ -125,6 +113,11 @@ public class ParquetAppender extends FileAppender {
   @Override
   public void close() throws IOException {
     IOUtils.cleanup(null, writer);
+
+    // TODO: getOffset is not implemented yet
+//    if (tableStatsEnabled) {
+//      stats.setNumBytes(getOffset());
+//    }
   }
 
   public long getEstimatedOutputSize() throws IOException {
@@ -138,7 +131,7 @@ public class ParquetAppender extends FileAppender {
    */
   @Override
   public TableStats getStats() {
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       return stats.getTableStat();
     } else {
       return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
index 03642a7..23ef059 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileWriter.java
@@ -89,8 +89,8 @@ public class DirectRawFileWriter extends FileAppender {
       isLocal = false;
     }
 
-    if (enabledStats) {
-      this.stats = new TableStatistics(this.schema);
+    if (tableStatsEnabled) {
+      this.stats = new TableStatistics(this.schema, columnStatsEnabled);
       this.shuffleType = PlannerUtil.getShuffleType(
           meta.getOption(StorageConstants.SHUFFLE_TYPE,
               PlannerUtil.getShuffleType(ShuffleType.NONE_SHUFFLE)));
@@ -115,7 +115,7 @@ public class DirectRawFileWriter extends FileAppender {
 
     rowBlock.getMemory().clear();
 
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.incrementRows(rowBlock.rows() - stats.getNumRows());
     }
   }
@@ -147,7 +147,7 @@ public class DirectRawFileWriter extends FileAppender {
   public void close() throws IOException {
     flush();
 
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.setNumBytes(getOffset());
     }
     if (LOG.isDebugEnabled()) {
@@ -160,7 +160,7 @@ public class DirectRawFileWriter extends FileAppender {
 
   @Override
   public TableStats getStats() {
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.setNumBytes(pos);
       return stats.getTableStat();
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index ed55506..20519b7 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -601,7 +601,6 @@ public class RCFile {
     boolean useNewMagic = true;
     private byte[] nullChars;
     private SerializerDeserializer serde;
-    private boolean isShuffle;
 
     // Insert a globally unique 16-byte value every few entries, so that one
     // can seek into the middle of a file and then synchronize with record
@@ -774,8 +773,8 @@ public class RCFile {
       writeFileHeader();
       finalizeFileHeader();
 
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
+      if (tableStatsEnabled) {
+        this.stats = new TableStatistics(this.schema, columnStatsEnabled);
       }
       super.init();
     }
@@ -866,7 +865,7 @@ public class RCFile {
       append(t);
       // Statistical section
 
-      if (enabledStats) {
+      if (tableStatsEnabled) {
         stats.incrementRow();
       }
     }
@@ -882,22 +881,14 @@ public class RCFile {
      * @throws java.io.IOException
      */
     public void append(Tuple tuple) throws IOException {
-      int size = schema.size();
-
-      for (int i = 0; i < size; i++) {
+      for (int i = 0; i < columnNumber; i++) {
+        if (tableStatsEnabled) {
+          stats.analyzeField(i, tuple);
+        }
         int length = columnBuffers[i].append(tuple, i);
         columnBufferSize += length;
       }
 
-      if (size < columnNumber) {
-        for (int i = size; i < columnNumber; i++) {
-          columnBuffers[i].append(NullDatum.get());
-          if (isShuffle) {
-            stats.analyzeNull(i);
-          }
-        }
-      }
-
       bufferedRecords++;
       //TODO compression rate base flush
       if ((columnBufferSize > COLUMNS_BUFFER_SIZE)
@@ -1077,7 +1068,7 @@ public class RCFile {
 
     @Override
     public TableStats getStats() {
-      if (enabledStats) {
+      if (tableStatsEnabled) {
         return stats.getTableStat();
       } else {
         return null;
@@ -1093,7 +1084,7 @@ public class RCFile {
 
       if (out != null) {
         // Statistical section
-        if (enabledStats) {
+        if (tableStatsEnabled) {
           stats.setNumBytes(getOffset());
         }
         // Close the underlying stream if we own it...

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
index b1a14e3..8e0a88c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileAppender.java
@@ -134,8 +134,8 @@ public class SequenceFileAppender extends FileAppender {
       writer = SequenceFile.createWriter(fs, conf, path, keyClass, valueClass, CompressionType.NONE, codec);
     }
 
-    if (enabledStats) {
-      this.stats = new TableStatistics(this.schema);
+    if (tableStatsEnabled) {
+      this.stats = new TableStatistics(this.schema, columnStatsEnabled);
     }
 
     super.init();
@@ -203,7 +203,7 @@ public class SequenceFileAppender extends FileAppender {
     pos += writer.getLength();
     rowCount++;
 
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.incrementRow();
     }
   }
@@ -221,7 +221,7 @@ public class SequenceFileAppender extends FileAppender {
   @Override
   public void close() throws IOException {
     // Statistical section
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       stats.setNumBytes(getOffset());
     }
 
@@ -230,7 +230,7 @@ public class SequenceFileAppender extends FileAppender {
 
   @Override
   public TableStats getStats() {
-    if (enabledStats) {
+    if (tableStatsEnabled) {
       return stats.getTableStat();
     } else {
       return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index c0ee784..53fbd57 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -131,8 +131,8 @@ public class DelimitedTextFile {
 
     @Override
     public void init() throws IOException {
-      if (enabledStats) {
-        this.stats = new TableStatistics(this.schema);
+      if (tableStatsEnabled) {
+        this.stats = new TableStatistics(this.schema, columnStatsEnabled);
       }
 
       if(serializer != null) {
@@ -189,7 +189,7 @@ public class DelimitedTextFile {
         flushBuffer();
       }
       // Statistical section
-      if (enabledStats) {
+      if (tableStatsEnabled) {
         stats.incrementRow();
       }
     }
@@ -226,7 +226,7 @@ public class DelimitedTextFile {
         flush();
 
         // Statistical section
-        if (enabledStats) {
+        if (tableStatsEnabled) {
           stats.setNumBytes(getOffset());
         }
 
@@ -246,7 +246,7 @@ public class DelimitedTextFile {
 
     @Override
     public TableStats getStats() {
-      if (enabledStats) {
+      if (tableStatsEnabled) {
         return stats.getTableStat();
       } else {
         return null;

http://git-wip-us.apache.org/repos/asf/tajo/blob/011fcd92/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index cc69119..9c30202 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -161,8 +161,7 @@ public class TestCompressionStorages {
     }
 
     int tupleCnt = 0;
-    Tuple tuple;
-    while ((tuple = scanner.next()) != null) {
+    while ((scanner.next()) != null) {
       tupleCnt++;
     }
     scanner.close();


Mime
View raw message