tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hyun...@apache.org
Subject git commit: TAJO-1008: Protocol buffer De/Serialization for EvalNode.
Date Wed, 20 Aug 2014 05:22:46 GMT
Repository: tajo
Updated Branches:
  refs/heads/master b16d13add -> 49d52553c


TAJO-1008: Protocol buffer De/Serialization for EvalNode.


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/49d52553
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/49d52553
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/49d52553

Branch: refs/heads/master
Commit: 49d52553c5e4ef37a6af4605befbe36b0dd36c3c
Parents: b16d13a
Author: Hyunsik Choi <hyunsik@apache.org>
Authored: Wed Aug 20 14:22:28 2014 +0900
Committer: Hyunsik Choi <hyunsik@apache.org>
Committed: Wed Aug 20 14:22:28 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   3 +
 .../apache/tajo/catalog/TestKeyValueSet.java    |   1 -
 .../org/apache/tajo/client/TajoGetConf.java     |  12 --
 .../java/org/apache/tajo/OverridableConf.java   |  12 ++
 .../java/org/apache/tajo/util/BitArray.java     |   2 +-
 .../java/org/apache/tajo/util/KeyValueSet.java  |   1 -
 .../planner/physical/ColPartitionStoreExec.java |  58 +++++--
 .../HashBasedColPartitionStoreExec.java         |   6 +-
 .../SortBasedColPartitionStoreExec.java         |  40 ++---
 .../engine/planner/physical/StoreTableExec.java |  66 +++++--
 .../org/apache/tajo/master/session/Session.java |   8 +-
 .../planner/physical/TestPhysicalPlanner.java   | 173 ++++++++++++++++++-
 .../java/org/apache/tajo/storage/Appender.java  |   2 +
 .../org/apache/tajo/storage/FileAppender.java   |   4 +
 .../org/apache/tajo/storage/StorageUtil.java    |  21 +++
 .../tajo/storage/parquet/ParquetAppender.java   |   4 +
 .../parquet/InternalParquetRecordWriter.java    |   4 +
 .../thirdparty/parquet/ParquetWriter.java       |   4 +
 .../tajo/storage/TestCompressionStorages.java   |   2 -
 19 files changed, 351 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index b605c71..96ffa4a 100644
--- a/CHANGES
+++ b/CHANGES
@@ -31,6 +31,9 @@ Release 0.9.0 - unreleased
 
   IMPROVEMENT
 
+    TAJO-1008: TAJO-1008: Protocol buffer De/Serialization for EvalNode.
+    (hyunsik)
+
     TAJO-984: Improve the default data type handling in RowStoreUtil.
     (jihoon via hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java
b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java
index b317ba4..a09275d 100644
--- a/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java
+++ b/tajo-catalog/tajo-catalog-common/src/test/java/org/apache/tajo/catalog/TestKeyValueSet.java
@@ -23,7 +23,6 @@ import org.apache.tajo.util.KeyValueSet;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class TestKeyValueSet {

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
----------------------------------------------------------------------
diff --git a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
index 52e1894..2377427 100644
--- a/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
+++ b/tajo-client/src/main/java/org/apache/tajo/client/TajoGetConf.java
@@ -20,24 +20,12 @@ package org.apache.tajo.client;
 
 import com.google.protobuf.ServiceException;
 import org.apache.commons.cli.*;
-import org.apache.commons.lang.StringUtils;
-import org.apache.tajo.QueryId;
-import org.apache.tajo.TajoProtos;
 import org.apache.tajo.conf.TajoConf;
-import org.apache.tajo.ipc.ClientProtos.BriefQueryInfo;
-import org.apache.tajo.ipc.ClientProtos.WorkerResourceInfo;
-import org.apache.tajo.util.NetUtils;
-import org.apache.tajo.util.TajoIdUtils;
 
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.Writer;
-import java.net.InetSocketAddress;
 import java.sql.SQLException;
-import java.text.DecimalFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
 
 public class TajoGetConf {
   private static final org.apache.commons.cli.Options options;

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
index 220bd43..c9cf7fa 100644
--- a/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
+++ b/tajo-common/src/main/java/org/apache/tajo/OverridableConf.java
@@ -99,6 +99,10 @@ public class OverridableConf extends KeyValueSet {
     return getBool(key, null);
   }
 
+  public void setInt(ConfigKey key, int val) {
+    setInt(key.keyname(), val);
+  }
+
   public int getInt(ConfigKey key, Integer defaultVal) {
     assertRegisteredEnum(key);
 
@@ -120,6 +124,10 @@ public class OverridableConf extends KeyValueSet {
     return getInt(key, null);
   }
 
+  public void setLong(ConfigKey key, long val) {
+    setLong(key.keyname(), val);
+  }
+
   public long getLong(ConfigKey key, Long defaultVal) {
     assertRegisteredEnum(key);
 
@@ -141,6 +149,10 @@ public class OverridableConf extends KeyValueSet {
     return getLong(key, null);
   }
 
+  public void setFloat(ConfigKey key, float val) {
+    setFloat(key.keyname(), val);
+  }
+
   public float getFloat(ConfigKey key, Float defaultVal) {
     assertRegisteredEnum(key);
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
index 9905b6f..e62496a 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/BitArray.java
@@ -80,7 +80,7 @@ public class BitArray {
   public void fromByteBuffer(ByteBuffer byteBuffer) {
     clear();
     int i = 0;
-    while(byteBuffer.hasRemaining()) {
+    while(i < data.length && byteBuffer.hasRemaining()) {
       data[i] = byteBuffer.get();
       i++;
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
index 0c3db6d..6edb547 100644
--- a/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
+++ b/tajo-common/src/main/java/org/apache/tajo/util/KeyValueSet.java
@@ -21,7 +21,6 @@ package org.apache.tajo.util;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.gson.annotations.Expose;
-import org.apache.tajo.OverridableConf;
 import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.json.CommonGsonHelper;
 import org.apache.tajo.json.GsonObject;

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
index e90baff..7d7d020 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java
@@ -24,10 +24,12 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.CreateTableNode;
 import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.NodeType;
@@ -35,6 +37,7 @@ import org.apache.tajo.engine.planner.logical.StoreTableNode;
 import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.StorageManagerFactory;
 import org.apache.tajo.storage.StorageUtil;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -50,6 +53,14 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
   protected final int [] keyIds;
   protected final String [] keyNames;
 
+  protected Appender appender;
+
+  // for file punctuation
+  protected TableStats aggregatedStats;                  // for aggregating all stats of
written files
+  protected long maxPerFileSize = Long.MAX_VALUE; // default max file size is 2^63
+  protected int writtenFileNum = 0;               // how many file are written so far?
+  protected Path lastFileName;                    // latest written file name
+
   public ColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan, PhysicalExec
child) {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
@@ -67,6 +78,15 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec {
       meta = CatalogUtil.newTableMeta(plan.getStorageType());
     }
 
+    if (!(plan instanceof InsertNode)) {
+      String nullChar = context.getQueryContext().get(SessionVars.NULL_CHAR);
+      StorageUtil.setNullCharForTextSerializer(meta, nullChar);
+    }
+
+    if (context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) {
+      maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE)
* StorageUnit.MB;
+    }
+
     // Find column index to name subpartition directory path
     keyNum = this.plan.getPartitionMethod().getExpressionSchema().size();
 
@@ -107,33 +127,45 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec
{
     if (!fs.exists(storeTablePath.getParent())) {
       fs.mkdirs(storeTablePath.getParent());
     }
+
+    aggregatedStats = new TableStats();
   }
 
   protected Path getDataFile(String partition) {
     return StorageUtil.concatPath(storeTablePath.getParent(), partition, storeTablePath.getName());
   }
 
-  protected Appender makeAppender(String partition) throws IOException {
-    Path dataFile = getDataFile(partition);
-    FileSystem fs = dataFile.getFileSystem(context.getConf());
+  protected Appender getNextPartitionAppender(String partition) throws IOException {
+    lastFileName = getDataFile(partition);
+    FileSystem fs = lastFileName.getFileSystem(context.getConf());
 
-    if (fs.exists(dataFile.getParent())) {
-      LOG.info("Path " + dataFile.getParent() + " already exists!");
+    if (fs.exists(lastFileName.getParent())) {
+      LOG.info("Path " + lastFileName.getParent() + " already exists!");
     } else {
-      fs.mkdirs(dataFile.getParent());
-      LOG.info("Add subpartition path directory :" + dataFile.getParent());
+      fs.mkdirs(lastFileName.getParent());
+      LOG.info("Add subpartition path directory :" + lastFileName.getParent());
     }
 
-    if (fs.exists(dataFile)) {
-      LOG.info("File " + dataFile + " already exists!");
-      FileStatus status = fs.getFileStatus(dataFile);
+    if (fs.exists(lastFileName)) {
+      LOG.info("File " + lastFileName + " already exists!");
+      FileStatus status = fs.getFileStatus(lastFileName);
       LOG.info("File size: " + status.getLen());
     }
 
-    Appender appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
outSchema, dataFile);
-    appender.enableStats();
-    appender.init();
+    openAppender(0);
 
     return appender;
   }
+
+  public void openAppender(int suffixId) throws IOException {
+    Path actualFilePath = lastFileName;
+    if (suffixId > 0) {
+      actualFilePath = new Path(lastFileName + "_" + suffixId);
+    }
+
+    appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
outSchema, actualFilePath);
+
+    appender.enableStats();
+    appender.init();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
index 44d1270..e27a43d 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashBasedColPartitionStoreExec.java
@@ -18,8 +18,6 @@
 
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
 import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
@@ -39,8 +37,6 @@ import java.util.Map;
  * This class is a physical operator to store at column partitioned table.
  */
 public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec {
-  private static Log LOG = LogFactory.getLog(HashBasedColPartitionStoreExec.class);
-
   private final Map<String, Appender> appenderMap = new HashMap<String, Appender>();
 
   public HashBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan,
PhysicalExec child)
@@ -56,7 +52,7 @@ public class HashBasedColPartitionStoreExec extends ColPartitionStoreExec
{
     Appender appender = appenderMap.get(partition);
 
     if (appender == null) {
-      appender = makeAppender(partition);
+      appender = getNextPartitionAppender(partition);
       appenderMap.put(partition, appender);
     } else {
       appender = appenderMap.get(partition);

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
index 9ce455f..6084f0e 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SortBasedColPartitionStoreExec.java
@@ -21,13 +21,9 @@
  */
 package org.apache.tajo.engine.planner.physical;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.tajo.catalog.statistics.StatisticsUtil;
-import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.datum.Datum;
 import org.apache.tajo.engine.planner.logical.StoreTableNode;
-import org.apache.tajo.storage.Appender;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
 import org.apache.tajo.util.StringUtils;
@@ -40,14 +36,9 @@ import java.io.IOException;
  * ascending or descending order of partition columns.
  */
 public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec {
-  private static Log LOG = LogFactory.getLog(SortBasedColPartitionStoreExec.class);
-
   private Tuple currentKey;
   private Tuple prevKey;
 
-  private Appender appender;
-  private TableStats aggregated;
-
   public SortBasedColPartitionStoreExec(TaskAttemptContext context, StoreTableNode plan,
PhysicalExec child)
       throws IOException {
     super(context, plan, child);
@@ -57,12 +48,6 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec
{
     super.init();
 
     currentKey = new VTuple(keyNum);
-    aggregated = new TableStats();
-  }
-
-  private Appender getAppender(String partition) throws IOException {
-    this.appender = makeAppender(partition);
-    return appender;
   }
 
   private void fillKeyTuple(Tuple inTuple, Tuple keyTuple) {
@@ -93,19 +78,30 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec
{
       fillKeyTuple(tuple, currentKey);
 
       if (prevKey == null) {
-        appender = getAppender(getSubdirectory(currentKey));
+        appender = getNextPartitionAppender(getSubdirectory(currentKey));
         prevKey = new VTuple(currentKey);
       } else {
-        if (!prevKey.equals(currentKey) && !getSubdirectory(prevKey).equalsIgnoreCase(getSubdirectory(currentKey)))
{
+        if (!prevKey.equals(currentKey)) {
           appender.close();
-          StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
+          StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats());
 
-          appender = getAppender(getSubdirectory(currentKey));
+          appender = getNextPartitionAppender(getSubdirectory(currentKey));
           prevKey = new VTuple(currentKey);
+
+          // reset all states for file rotating
+          writtenFileNum = 0;
         }
       }
 
       appender.addTuple(tuple);
+
+      if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize())
{
+        appender.close();
+        writtenFileNum++;
+        StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats());
+
+        openAppender(writtenFileNum);
+      }
     }
 
     return null;
@@ -115,8 +111,10 @@ public class SortBasedColPartitionStoreExec extends ColPartitionStoreExec
{
   public void close() throws IOException {
     if (appender != null) {
       appender.close();
-      StatisticsUtil.aggregateTableStat(aggregated, appender.getStats());
-      context.setResultStats(aggregated);
+
+      // Collect statistics data
+      StatisticsUtil.aggregateTableStat(aggregatedStats, appender.getStats());
+      context.setResultStats(aggregatedStats);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
index e73cc2f..3753d26 100644
--- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
+++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java
@@ -18,15 +18,18 @@
 
 package org.apache.tajo.engine.planner.physical;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.apache.tajo.SessionVars;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.StatisticsUtil;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.engine.planner.logical.InsertNode;
 import org.apache.tajo.engine.planner.logical.PersistentStoreNode;
-import org.apache.tajo.storage.Appender;
-import org.apache.tajo.storage.StorageConstants;
-import org.apache.tajo.storage.StorageManagerFactory;
-import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.*;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.worker.TaskAttemptContext;
 
 import java.io.IOException;
@@ -35,24 +38,50 @@ import java.io.IOException;
  * This is a physical executor to store a table part into a specified storage.
  */
 public class StoreTableExec extends UnaryPhysicalExec {
+  private static final Log LOG = LogFactory.getLog(StoreTableExec.class);
+
   private PersistentStoreNode plan;
+  private TableMeta meta;
   private Appender appender;
   private Tuple tuple;
 
+  // for file punctuation
+  private TableStats sumStats;                  // for aggregating all stats of written files
+  private long maxPerFileSize = Long.MAX_VALUE; // default max file size is 2^63
+  private int writtenFileNum = 0;               // how many file are written so far?
+  private Path lastFileName;                    // latest written file name
+
   public StoreTableExec(TaskAttemptContext context, PersistentStoreNode plan, PhysicalExec
child) throws IOException {
     super(context, plan.getInSchema(), plan.getOutSchema(), child);
     this.plan = plan;
+
+    if (context.getQueryContext().containsKey(SessionVars.MAX_OUTPUT_FILE_SIZE)) {
+      maxPerFileSize = context.getQueryContext().getLong(SessionVars.MAX_OUTPUT_FILE_SIZE)
* StorageUnit.MB;
+    }
   }
 
   public void init() throws IOException {
     super.init();
 
-    TableMeta meta;
     if (plan.hasOptions()) {
       meta = CatalogUtil.newTableMeta(plan.getStorageType(), plan.getOptions());
     } else {
       meta = CatalogUtil.newTableMeta(plan.getStorageType());
     }
+    openNewFile(writtenFileNum);
+
+    sumStats = new TableStats();
+  }
+
+  public void openNewFile(int suffixId) throws IOException {
+    String prevFile = null;
+
+    lastFileName = context.getOutputPath();
+    if (suffixId > 0) {
+      prevFile = lastFileName.toString();
+
+      lastFileName = new Path(lastFileName + "_" + suffixId);
+    }
 
     if (plan instanceof InsertNode) {
       InsertNode createTableNode = (InsertNode) plan;
@@ -60,13 +89,17 @@ public class StoreTableExec extends UnaryPhysicalExec {
           createTableNode.getTableSchema(), context.getOutputPath());
     } else {
       String nullChar = context.getQueryContext().get(SessionVars.NULL_CHAR);
-      meta.putOption(StorageConstants.CSVFILE_NULL, nullChar);
-      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
outSchema,
-          context.getOutputPath());
+      StorageUtil.setNullCharForTextSerializer(meta, nullChar);
+      appender = StorageManagerFactory.getStorageManager(context.getConf()).getAppender(meta,
outSchema, lastFileName);
     }
 
     appender.enableStats();
     appender.init();
+
+    if (suffixId > 0) {
+      LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " ("
+ maxPerFileSize + " MB), " +
+          "The remain output will be written into " + lastFileName.toString());
+    }
   }
 
   /* (non-Javadoc)
@@ -76,8 +109,16 @@ public class StoreTableExec extends UnaryPhysicalExec {
   public Tuple next() throws IOException {
     while((tuple = child.next()) != null) {
       appender.addTuple(tuple);
-    }
 
+      if (maxPerFileSize > 0 && maxPerFileSize <= appender.getEstimatedOutputSize())
{
+        appender.close();
+
+        writtenFileNum++;
+        StatisticsUtil.aggregateTableStat(sumStats, appender.getStats());
+        openNewFile(writtenFileNum);
+      }
+    }
+        
     return null;
   }
 
@@ -93,7 +134,12 @@ public class StoreTableExec extends UnaryPhysicalExec {
       appender.flush();
       appender.close();
       // Collect statistics data
-      context.setResultStats(appender.getStats());
+      if (sumStats == null) {
+        context.setResultStats(appender.getStats());
+      } else {
+        StatisticsUtil.aggregateTableStat(sumStats, appender.getStats());
+        context.setResultStats(sumStats);
+      }
       if (context.getTaskId() != null) {
         context.addShuffleFileOutput(0, context.getTaskId().toString());
       }

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
index cdf552d..1f21e2a 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/session/Session.java
@@ -28,7 +28,7 @@ import java.util.Map;
 
 import static org.apache.tajo.ipc.TajoWorkerProtocol.SessionProto;
 
-public class Session implements SessionConstants, ProtoObject<SessionProto> {
+public class Session implements SessionConstants, ProtoObject<SessionProto>, Cloneable
{
   private final String sessionId;
   private final String userName;
   private String currentDatabase;
@@ -133,4 +133,10 @@ public class Session implements SessionConstants, ProtoObject<SessionProto>
{
   public String toString() {
     return "user=" + getUserName() + ",id=" + getSessionId() +",last_atime=" + getLastAccessTime();
   }
+
+  public Session clone() throws CloneNotSupportedException {
+    Session newSession = (Session) super.clone();
+    newSession.sessionVariables.putAll(getAllVariables());
+    return newSession;
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
index a184a9a..da66aa5 100644
--- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java
@@ -25,13 +25,11 @@ import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tajo.LocalTajoTestingUtility;
-import org.apache.tajo.QueryUnitAttemptId;
-import org.apache.tajo.TajoConstants;
-import org.apache.tajo.TajoTestingCluster;
+import org.apache.tajo.*;
 import org.apache.tajo.algebra.Expr;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.proto.CatalogProtos.StoreType;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
@@ -52,6 +50,7 @@ import org.apache.tajo.storage.*;
 import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.index.bst.BSTIndex;
+import org.apache.tajo.unit.StorageUnit;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
@@ -89,6 +88,7 @@ public class TestPhysicalPlanner {
 
   private static TableDesc employee = null;
   private static TableDesc score = null;
+  private static TableDesc largeScore = null;
 
   private static MasterPlan masterPlan;
 
@@ -171,6 +171,45 @@ public class TestPhysicalPlanner {
 
     defaultContext = LocalTajoTestingUtility.createDummyContext(conf);
     masterPlan = new MasterPlan(LocalTajoTestingUtility.newQueryId(), null, null);
+
+    createLargeScoreTable();
+  }
+
+  public static void createLargeScoreTable() throws IOException {
+    // Preparing a large table
+    Path scoreLargePath = new Path(testDir, "score_large");
+    CommonTestingUtil.cleanupTestDir(scoreLargePath.toString());
+
+    Schema scoreSchmea = score.getSchema();
+    TableMeta scoreLargeMeta = CatalogUtil.newTableMeta(StoreType.RAW, new KeyValueSet());
+    Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(scoreLargeMeta,
scoreSchmea,
+        scoreLargePath);
+    appender.enableStats();
+    appender.init();
+    largeScore = new TableDesc(
+        CatalogUtil.buildFQName(TajoConstants.DEFAULT_DATABASE_NAME, "score_large"), scoreSchmea,
scoreLargeMeta,
+        scoreLargePath);
+
+    Tuple tuple = new VTuple(scoreSchmea.size());
+    int m = 0;
+    for (int i = 1; i <= 35000; i++) {
+      for (int k = 3; k < 5; k++) { // |{3,4}| = 2
+        for (int j = 1; j <= 3; j++) { // |{1,2,3}| = 3
+          tuple.put(
+              new Datum[] {
+                  DatumFactory.createText("name_" + i), // name_1 ~ 5 (cad: // 5)
+                  DatumFactory.createText(k + "rd"), // 3 or 4rd (cad: 2)
+                  DatumFactory.createInt4(j), // 1 ~ 3
+                  m % 3 == 1 ? DatumFactory.createText("one") : NullDatum.get()});
+          appender.addTuple(tuple);
+          m++;
+        }
+      }
+    }
+    appender.flush();
+    appender.close();
+    largeScore.setStats(appender.getStats());
+    catalog.createTable(largeScore);
   }
 
   @AfterClass
@@ -376,7 +415,10 @@ public class TestPhysicalPlanner {
   private String[] CreateTableAsStmts = {
       "create table grouped1 as select deptName, class, sum(score), max(score), min(score)
from score group by deptName, class", // 0
       "create table grouped2 using rcfile as select deptName, class, sum(score), max(score),
min(score) from score group by deptName, class", // 1
-      "create table grouped3 partition by column (dept text,  class text) as select sum(score),
max(score), min(score), deptName, class from score group by deptName, class", // 2
+      "create table grouped3 partition by column (dept text,  class text) as select sum(score),
max(score), min(score), deptName, class from score group by deptName, class", // 2,
+      "create table score_large_output as select * from score_large", // 4
+      "CREATE TABLE score_part (deptname text, score int4, nullable text) PARTITION BY COLUMN
(class text) " +
+          "AS SELECT deptname, score, nullable, class from score_large" // 5
   };
 
   @Test
@@ -422,6 +464,62 @@ public class TestPhysicalPlanner {
   }
 
   @Test
+  public final void testStorePlanWithMaxOutputFileSize() throws IOException, PlanningException,
+      CloneNotSupportedException {
+
+    TableStats stats = largeScore.getStats();
+    assertTrue("Checking meaningfulness of test", stats.getNumBytes() > StorageUnit.MB);
+
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(),
+        largeScore.getPath(), Integer.MAX_VALUE);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testStorePlanWithMaxOutputFileSize");
+
+    QueryContext queryContext = new QueryContext(conf, session);
+    queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1);
+
+    TaskAttemptContext ctx = new TaskAttemptContext(
+        queryContext,
+        LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan),
+        new FileFragment[] { frags[0] }, workDir);
+    ctx.setEnforcer(new Enforcer());
+    ctx.setOutputPath(new Path(workDir, "maxOutput"));
+
+    Expr context = analyzer.parse(CreateTableAsStmts[3]);
+
+    LogicalPlan plan = planner.createPlan(queryContext, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    // executing StoreTableExec
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    // checking the number of punctuated files
+    int expectedFileNum = (int) Math.ceil(stats.getNumBytes() / (float) StorageUnit.MB);
+    FileSystem fs = ctx.getOutputPath().getFileSystem(conf);
+    FileStatus [] statuses = fs.listStatus(ctx.getOutputPath().getParent());
+    assertEquals(expectedFileNum, statuses.length);
+
+    // checking the file contents
+    long totalNum = 0;
+    for (FileStatus status : fs.listStatus(ctx.getOutputPath().getParent())) {
+      Scanner scanner = StorageManagerFactory.getStorageManager(conf).getFileScanner(
+          CatalogUtil.newTableMeta(StoreType.CSV),
+          rootNode.getOutSchema(),
+          status.getPath());
+
+      scanner.init();
+      while ((scanner.next()) != null) {
+        totalNum++;
+      }
+      scanner.close();
+    }
+    assertTrue(totalNum == ctx.getResultStats().getNumRows());
+  }
+
+  @Test
   public final void testStorePlanWithRCFile() throws IOException, PlanningException {
     FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(),
score.getPath(),
         Integer.MAX_VALUE);
@@ -588,6 +686,71 @@ public class TestPhysicalPlanner {
   }
 
   @Test
+  public final void testPartitionedStorePlanWithMaxFileSize() throws IOException, PlanningException
{
+
+    // Preparing working dir and input fragments
+    FileFragment[] frags = StorageManager.splitNG(conf, "default.score_large", largeScore.getMeta(),
+        largeScore.getPath(), Integer.MAX_VALUE);
+    QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan);
+    Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlanWithMaxFileSize");
+
+    // Setting session variables
+    QueryContext queryContext = new QueryContext(conf, session);
+    queryContext.setInt(SessionVars.MAX_OUTPUT_FILE_SIZE, 1);
+
+    // Preparing task context
+    TaskAttemptContext ctx = new TaskAttemptContext(queryContext, id, new FileFragment[]
{ frags[0] }, workDir);
+    ctx.setOutputPath(new Path(workDir, "part-01-000000"));
+    // SortBasedColumnPartitionStoreExec will be chosen by default.
+    ctx.setEnforcer(new Enforcer());
+    Expr context = analyzer.parse(CreateTableAsStmts[4]);
+    LogicalPlan plan = planner.createPlan(queryContext, context);
+    LogicalNode rootNode = optimizer.optimize(plan);
+
+    // Executing CREATE TABLE PARTITION BY
+    PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm);
+    PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode);
+    exec.init();
+    exec.next();
+    exec.close();
+
+    FileSystem fs = sm.getFileSystem();
+    FileStatus [] list = fs.listStatus(workDir);
+    // checking the number of partitions
+    assertEquals(2, list.length);
+
+    List<FileFragment> fragments = Lists.newArrayList();
+    int i = 0;
+    for (FileStatus status : list) {
+      assertTrue(status.isDirectory());
+
+      long fileVolumSum = 0;
+      FileStatus [] fileStatuses = fs.listStatus(status.getPath());
+      for (FileStatus fileStatus : fileStatuses) {
+        fileVolumSum += fileStatus.getLen();
+        fragments.add(new FileFragment("partition", fileStatus.getPath(), 0, fileStatus.getLen()));
+      }
+      assertTrue("checking the meaningfulness of test", fileVolumSum > StorageUnit.MB
&& fileStatuses.length > 1);
+
+      long expectedFileNum = (long) Math.ceil(fileVolumSum / (float)StorageUnit.MB);
+      assertEquals(expectedFileNum, fileStatuses.length);
+    }
+    TableMeta outputMeta = CatalogUtil.newTableMeta(StoreType.CSV);
+    Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments));
+    scanner.init();
+
+    long rowNum = 0;
+    while (scanner.next() != null) {
+      rowNum++;
+    }
+
+    // checking the number of all written rows
+    assertTrue(largeScore.getStats().getNumRows() == rowNum);
+
+    scanner.close();
+  }
+
+  @Test
   public final void testPartitionedStorePlanWithEmptyGroupingSet()
       throws IOException, PlanningException {
     FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(),
score.getPath(),

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
index 5b42cbd..c5e96ac 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/Appender.java
@@ -31,6 +31,8 @@ public interface Appender extends Closeable {
   
   void flush() throws IOException;
 
+  long getEstimatedOutputSize() throws IOException;
+  
   void close() throws IOException;
 
   void enableStats();

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
index 064841f..04278e9 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java
@@ -57,5 +57,9 @@ public abstract class FileAppender implements Appender {
     this.enabledStats = true;
   }
 
+  public long getEstimatedOutputSize() throws IOException {
+    return getOffset();
+  }
+
   public abstract long getOffset() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
index 07fa16b..98eaafc 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java
@@ -240,4 +240,25 @@ public class StorageUtil extends StorageConstants {
       amt -= ret;
     }
   }
+
+  /**
+   * Set nullChar to TableMeta according to file format
+   *
+   * @param meta TableMeta
+   * @param nullChar A character for NULL representation
+   */
+  public static void setNullCharForTextSerializer(TableMeta meta, String nullChar) {
+    switch (meta.getStoreType()) {
+    case CSV:
+      meta.putOption(StorageConstants.CSVFILE_NULL, nullChar);
+      break;
+    case RCFILE:
+      meta.putOption(StorageConstants.RCFILE_NULL, nullChar);
+      break;
+    case SEQUENCEFILE:
+      meta.putOption(StorageConstants.SEQUENCEFILE_NULL, nullChar);
+      break;
+    default: // nothing to do
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
index 10b9331..ff9e43c 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java
@@ -129,6 +129,10 @@ public class ParquetAppender extends FileAppender {
     writer.close();
   }
 
+  public long getEstimatedOutputSize() throws IOException {
+    return writer.getEstimatedWrittenSize();
+  }
+
   /**
    * If table statistics is enabled, retrieve the table statistics.
    *

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
index 8ce4b1c..7410d2b 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordWriter.java
@@ -139,6 +139,10 @@ class InternalParquetRecordWriter<T> {
     }
   }
 
+  public long getEstimatedWrittenSize() throws IOException {
+    return w.getPos() + store.memSize();
+  }
+
   private void flushStore()
       throws IOException {
     LOG.info(format("Flushing mem store to file. allocated memory: %,d", store.allocatedSize()));

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
index 743d168..0447a47 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
@@ -210,6 +210,10 @@ public class ParquetWriter<T> implements Closeable {
     }
   }
 
+  public long getEstimatedWrittenSize() throws IOException {
+    return this.writer.getEstimatedWrittenSize();
+  }
+
   @Override
   public void close() throws IOException {
     try {

http://git-wip-us.apache.org/repos/asf/tajo/blob/49d52553/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
index 61f4682..cae0357 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestCompressionStorages.java
@@ -21,7 +21,6 @@ package org.apache.tajo.storage;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Writable;
@@ -48,7 +47,6 @@ import java.util.Arrays;
 import java.util.Collection;
 
 import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
 
 @RunWith(Parameterized.class)
 public class TestCompressionStorages {


Mime
View raw message