tajo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject tajo git commit: TAJO-1928: Can't read parquet on hive meta.
Date Sat, 17 Oct 2015 02:36:40 GMT
Repository: tajo
Updated Branches:
  refs/heads/master 54bf51678 -> ec1337ff2


TAJO-1928: Can't read parquet on hive meta.

Closes #826


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ec1337ff
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ec1337ff
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ec1337ff

Branch: refs/heads/master
Commit: ec1337ff22c1aaff51918f786ff559a2cd8b6f29
Parents: 54bf516
Author: Jinho Kim <jhkim@apache.org>
Authored: Fri Oct 16 15:40:30 2015 +0900
Committer: Jinho Kim <jhkim@apache.org>
Committed: Fri Oct 16 15:40:30 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   1 +
 .../tajo/catalog/store/HiveCatalogStore.java    | 132 +++++++++++--------
 .../tajo/catalog/store/HiveCatalogUtil.java     |  65 +++++----
 .../catalog/store/TestHiveCatalogStore.java     |  62 +++++++--
 .../apache/tajo/storage/StorageConstants.java   |   2 +-
 tajo-dist/src/main/conf/tajo-env.cmd            |   2 +-
 .../apache/tajo/storage/orc/ORCAppender.java    |   2 +-
 7 files changed, 176 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ab2e45f..974e6e1 100644
--- a/CHANGES
+++ b/CHANGES
@@ -338,6 +338,7 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1928: Can't read parquet on hive meta. (jinho)
 
     TAJO-1923: Selecting on information_schema.table_stats throws an internal 
     error. (jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
index f4e21dd..6355a5f 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java
@@ -28,11 +28,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
 import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.TajoConstants;
 import org.apache.tajo.algebra.Expr;
@@ -52,6 +57,7 @@ import org.apache.tajo.storage.StorageConstants;
 import org.apache.tajo.util.KeyValueSet;
 import org.apache.tajo.util.TUtil;
 import org.apache.thrift.TException;
+import parquet.hadoop.ParquetOutputFormat;
 
 import java.io.File;
 import java.io.IOException;
@@ -61,9 +67,10 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
   protected final Log LOG = LogFactory.getLog(getClass());
 
   private static String HIVE_WAREHOUSE_DIR_CONF_KEY = "hive.metastore.warehouse.dir";
+  private static final int CLIENT_POOL_SIZE = 2;
+  private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory();
 
   protected Configuration conf;
-  private static final int CLIENT_POOL_SIZE = 2;
   private final HiveCatalogStoreClientPool clientPool;
   private final String defaultTableSpaceUri;
   private final String catalogUri;
@@ -104,12 +111,27 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
     return exist;
   }
 
+  protected org.apache.hadoop.hive.ql.metadata.Table getHiveTable(String databaseName, final
String tableName)
+      throws UndefinedTableException {
+
+    HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
+    try {
+      client = clientPool.getClient();
+      return HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName);
+    } catch (NoSuchObjectException nsoe) {
+      throw new UndefinedTableException(tableName);
+    } catch (Exception e) {
+      throw new TajoInternalError(e);
+    } finally {
+      if (client != null) client.release();
+    }
+  }
+
   @Override
   public final CatalogProtos.TableDescProto getTable(String databaseName, final String tableName)
       throws UndefinedTableException {
 
     org.apache.hadoop.hive.ql.metadata.Table table = null;
-    HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null;
     Path path = null;
     String dataFormat = null;
     org.apache.tajo.catalog.Schema schema = null;
@@ -122,21 +144,14 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
     //////////////////////////////////
     try {
       // get hive table schema
-      try {
-        client = clientPool.getClient();
-        table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName);
-        path = table.getPath();
-      } catch (NoSuchObjectException nsoe) {
-        throw new UndefinedTableException(tableName);
-      } catch (Exception e) {
-        throw new TajoInternalError(e);
-      }
+      table = getHiveTable(databaseName, tableName);
+      path = table.getPath();
 
       // convert HiveCatalogStore field schema into tajo field schema.
       schema = new org.apache.tajo.catalog.Schema();
 
       List<FieldSchema> fieldSchemaList = table.getCols();
-      boolean isPartitionKey = false;
+      boolean isPartitionKey;
       for (FieldSchema eachField : fieldSchemaList) {
         isPartitionKey = false;
 
@@ -184,14 +199,13 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
         }
         options.remove(serdeConstants.SERIALIZATION_NULL_FORMAT);
 
-        // set file output format
-        String fileOutputformat = properties.getProperty(hive_metastoreConstants.FILE_OUTPUT_FORMAT);
-        dataFormat = HiveCatalogUtil.getDataFormat(fileOutputformat);
 
-        if (dataFormat.equalsIgnoreCase("TEXT")) {
+        dataFormat = HiveCatalogUtil.getDataFormat(table.getSd());
+        if (BuiltinStorages.TEXT.equals(dataFormat)) {
           options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter));
           options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava(nullFormat));
-        } else if (dataFormat.equals("RCFILE")) {
+
+        } else if (BuiltinStorages.RCFILE.equals(dataFormat)) {
           options.set(StorageConstants.RCFILE_NULL, StringEscapeUtils.escapeJava(nullFormat));
           String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB);
           if (LazyBinaryColumnarSerDe.class.getName().equals(serde)) {
@@ -199,7 +213,8 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
           } else if (ColumnarSerDe.class.getName().equals(serde)) {
             options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
           }
-        } else if (dataFormat.equals("SEQUENCEFILE")) {
+
+        } else if (BuiltinStorages.SEQUENCE_FILE.equals(dataFormat)) {
           options.set(StorageConstants.SEQUENCEFILE_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter));
           options.set(StorageConstants.SEQUENCEFILE_NULL, StringEscapeUtils.escapeJava(nullFormat));
           String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB);
@@ -208,6 +223,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
           } else if (LazySimpleSerDe.class.getName().equals(serde)) {
             options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
           }
+
         }
 
         // set data size
@@ -255,9 +271,8 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
       }
     } catch (Throwable t) {
       throw new TajoInternalError(t);
-    } finally {
-      if(client != null) client.release();
     }
+
     TableMeta meta = new TableMeta(dataFormat, options);
     TableDesc tableDesc = new TableDesc(databaseName + "." + tableName, schema, meta, path.toUri());
     if (table.getTableType().equals(TableType.EXTERNAL_TABLE)) {
@@ -341,7 +356,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
 
   @Override
   public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) {
-    throw new TajoRuntimeException(new UnsupportedException("Tablespace in HiveMeta"));
+    // SKIP
   }
 
   @Override
@@ -442,21 +457,18 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
       sd.getSerdeInfo().setParameters(new HashMap<>());
       sd.getSerdeInfo().setName(table.getTableName());
 
-      // if tajo set location method, thrift client make exception as follows:
-      // Caused by: MetaException(message:java.lang.NullPointerException)
-      // If you want to modify table path, you have to modify on Hive cli.
-      if (tableDesc.isExternal()) {
-        table.setTableType(TableType.EXTERNAL_TABLE.name());
-        table.putToParameters("EXTERNAL", "TRUE");
-
-        Path tablePath = new Path(tableDesc.getUri());
-        FileSystem fs = tablePath.getFileSystem(conf);
-        if (fs.isFile(tablePath)) {
-          LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path.");
-          sd.setLocation(tablePath.getParent().toString());
-        } else {
-          sd.setLocation(tablePath.toString());
-        }
+      //If tableType is a managed-table, the location is hive-warehouse dir
+      // and it will be wrong path in output committing
+      table.setTableType(TableType.EXTERNAL_TABLE.name());
+      table.putToParameters("EXTERNAL", "TRUE");
+
+      Path tablePath = new Path(tableDesc.getUri());
+      FileSystem fs = tablePath.getFileSystem(conf);
+      if (fs.isFile(tablePath)) {
+        LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path.");
+        sd.setLocation(tablePath.getParent().toString());
+      } else {
+        sd.setLocation(tablePath.toString());
       }
 
       // set column information
@@ -480,14 +492,15 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
       }
 
       if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.RCFILE)) {
+        StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.RCFILE);
+        sd.setInputFormat(descriptor.getInputFormat());
+        sd.setOutputFormat(descriptor.getOutputFormat());
+
         String serde = tableDesc.getMeta().getOption(StorageConstants.RCFILE_SERDE);
-        sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName());
-        sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName());
         if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) {
-          sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName());
+          sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName());
         } else {
-          sd.getSerdeInfo().setSerializationLib(
-              org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe.class.getName());
+          sd.getSerdeInfo().setSerializationLib(LazyBinaryColumnarSerDe.class.getName());
         }
 
         if (tableDesc.getMeta().getOptions().containsKey(StorageConstants.RCFILE_NULL)) {
@@ -495,9 +508,10 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
               StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.RCFILE_NULL)));
         }
       } else if (tableDesc.getMeta().getDataFormat().equals(BuiltinStorages.TEXT)) {
-        sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
-        sd.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class.getName());
-        sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName());
+        // TextFileStorageFormatDescriptor has deprecated class. so the class name set directly
+        sd.setInputFormat(TextInputFormat.class.getName());
+        sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getName());
+        sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());
 
         String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.TEXT_DELIMITER,
             StorageConstants.DEFAULT_FIELD_DELIMITER);
@@ -519,12 +533,14 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
           table.getParameters().remove(StorageConstants.TEXT_NULL);
         }
       } else if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE))
{
+        StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.SEQUENCEFILE);
+        sd.setInputFormat(descriptor.getInputFormat());
+        sd.setOutputFormat(descriptor.getOutputFormat());
+
         String serde = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE);
-        sd.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName());
-        sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.class.getName());
 
         if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) {
-          sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName());
+          sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());
 
           String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_DELIMITER,
               StorageConstants.DEFAULT_FIELD_DELIMITER);
@@ -540,7 +556,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
               StringEscapeUtils.unescapeJava(fieldDelimiter));
           table.getParameters().remove(StorageConstants.SEQUENCEFILE_DELIMITER);
         } else {
-          sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class.getName());
+          sd.getSerdeInfo().setSerializationLib(LazyBinarySerDe.class.getName());
         }
 
         if (tableDesc.getMeta().containsOption(StorageConstants.SEQUENCEFILE_NULL)) {
@@ -548,14 +564,18 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
               StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_NULL)));
           table.getParameters().remove(StorageConstants.SEQUENCEFILE_NULL);
         }
-      } else {
-        if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.PARQUET))
{
-          sd.setInputFormat(parquet.hive.DeprecatedParquetInputFormat.class.getName());
-          sd.setOutputFormat(parquet.hive.DeprecatedParquetOutputFormat.class.getName());
-          sd.getSerdeInfo().setSerializationLib(parquet.hive.serde.ParquetHiveSerDe.class.getName());
-        } else {
-          throw new UnsupportedException(tableDesc.getMeta().getDataFormat() + " in HivecatalogStore");
+      } else if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.PARQUET))
{
+        StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.PARQUET);
+        sd.setInputFormat(descriptor.getInputFormat());
+        sd.setOutputFormat(descriptor.getOutputFormat());
+        sd.getSerdeInfo().setSerializationLib(descriptor.getSerde());
+
+        if (tableDesc.getMeta().containsOption(ParquetOutputFormat.COMPRESSION)) {
+          table.putToParameters(ParquetOutputFormat.COMPRESSION,
+              tableDesc.getMeta().getOption(ParquetOutputFormat.COMPRESSION));
         }
+      } else {
+        throw new UnsupportedException(tableDesc.getMeta().getDataFormat() + " in HivecatalogStore");
       }
 
       sd.setSortCols(new ArrayList<>());
@@ -1293,6 +1313,6 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore
{
 
   @Override
   public List<TablespaceProto> getTablespaces() {
-    throw new UnsupportedOperationException();
+    return Lists.newArrayList(getTablespace(TajoConstants.DEFAULT_TABLESPACE_NAME));
   }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
index 9e1da2b..bbb7ade 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java
@@ -20,17 +20,25 @@ package org.apache.tajo.catalog.store;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
-import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
-import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
+import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.tajo.BuiltinStorages;
-import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
-import org.apache.tajo.exception.*;
+import org.apache.tajo.exception.LMDNoMatchedDatatypeException;
+import org.apache.tajo.exception.TajoRuntimeException;
+import org.apache.tajo.exception.UnknownDataFormatException;
+import org.apache.tajo.exception.UnsupportedException;
 import org.apache.thrift.TException;
-import parquet.hadoop.mapred.DeprecatedParquetOutputFormat;
 
 public class HiveCatalogUtil {
   public static void validateSchema(Table tblSchema) {
@@ -99,25 +107,38 @@ public class HiveCatalogUtil {
     }
   }
 
-  public static String getDataFormat(String fileFormat) {
-    Preconditions.checkNotNull(fileFormat);
+  public static String getDataFormat(StorageDescriptor descriptor) {
+    Preconditions.checkNotNull(descriptor);
 
-    String[] fileFormatArrary = fileFormat.split("\\.");
-    if(fileFormatArrary.length < 1) {
-      throw new TajoRuntimeException(new UnknownDataFormatException(fileFormat));
-    }
+    String serde = descriptor.getSerdeInfo().getSerializationLib();
+    String inputFormat = descriptor.getInputFormat();
 
-    String outputFormatClass = fileFormatArrary[fileFormatArrary.length-1];
-    if(outputFormatClass.equals(HiveIgnoreKeyTextOutputFormat.class.getSimpleName())) {
-      return BuiltinStorages.TEXT;
-    } else if(outputFormatClass.equals(HiveSequenceFileOutputFormat.class.getSimpleName()))
{
-      return CatalogProtos.DataFormat.SEQUENCEFILE.name();
-    } else if(outputFormatClass.equals(RCFileOutputFormat.class.getSimpleName())) {
-      return CatalogProtos.DataFormat.RCFILE.name();
-    } else if(outputFormatClass.equals(DeprecatedParquetOutputFormat.class.getSimpleName()))
{
-      return CatalogProtos.DataFormat.PARQUET.name();
+    if (LazySimpleSerDe.class.getName().equals(serde)) {
+      if (TextInputFormat.class.getName().equals(inputFormat)) {
+        return BuiltinStorages.TEXT;
+      } else if (SequenceFileInputFormat.class.getName().equals(inputFormat)) {
+        return BuiltinStorages.SEQUENCE_FILE;
+      } else {
+        throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
+      }
+    } else if (LazyBinarySerDe.class.getName().equals(serde)) {
+      if (SequenceFileInputFormat.class.getName().equals(inputFormat)) {
+        return BuiltinStorages.SEQUENCE_FILE;
+      } else {
+        throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
+      }
+    } else if (LazyBinaryColumnarSerDe.class.getName().equals(serde) || ColumnarSerDe.class.getName().equals(serde))
{
+      if (RCFileInputFormat.class.getName().equals(inputFormat)) {
+        return BuiltinStorages.RCFILE;
+      } else {
+        throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
+      }
+    } else if (ParquetHiveSerDe.class.getName().equals(serde)) {
+      return BuiltinStorages.PARQUET;
+    } else if (AvroSerDe.class.getName().equals(serde)) {
+      return BuiltinStorages.AVRO;
     } else {
-      throw new TajoRuntimeException(new UnknownDataFormatException(fileFormat));
+      throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
index 43de047..7608a7b 100644
--- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
+++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java
@@ -24,6 +24,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
+import org.apache.hadoop.hive.ql.io.IOConstants;
+import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
+import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
+import org.apache.tajo.BuiltinStorages;
 import org.apache.tajo.catalog.*;
 import org.apache.tajo.catalog.partition.PartitionDesc;
 import org.apache.tajo.catalog.partition.PartitionMethodDesc;
@@ -60,9 +65,11 @@ public class TestHiveCatalogStore {
 
   private static HiveCatalogStore store;
   private static Path warehousePath;
+  private static StorageFormatFactory formatFactory;
 
   @BeforeClass
   public static void setUp() throws Exception {
+    formatFactory = new StorageFormatFactory();
     Path testPath = CommonTestingUtil.getTestDir();
     warehousePath = new Path(testPath, "warehouse");
 
@@ -86,7 +93,7 @@ public class TestHiveCatalogStore {
 
   @Test
   public void testTableUsingTextFile() throws Exception {
-    TableMeta meta = new TableMeta("TEXT", new KeyValueSet());
+    TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("c_custkey", TajoDataTypes.Type.INT4);
@@ -103,6 +110,12 @@ public class TestHiveCatalogStore {
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, CUSTOMER));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.TEXTFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, CUSTOMER);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    //IgnoreKeyTextOutputFormat was deprecated
+    assertEquals(HiveIgnoreKeyTextOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -120,7 +133,7 @@ public class TestHiveCatalogStore {
   public void testTableUsingRCFileWithBinarySerde() throws Exception {
     KeyValueSet options = new KeyValueSet();
     options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
-    TableMeta meta = new TableMeta("RCFILE", options);
+    TableMeta meta = new TableMeta(BuiltinStorages.RCFILE, options);
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
@@ -132,6 +145,11 @@ public class TestHiveCatalogStore {
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, REGION));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.RCFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -149,7 +167,7 @@ public class TestHiveCatalogStore {
   public void testTableUsingRCFileWithTextSerde() throws Exception {
     KeyValueSet options = new KeyValueSet();
     options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
-    TableMeta meta = new TableMeta("RCFILE", options);
+    TableMeta meta = new TableMeta(BuiltinStorages.RCFILE, options);
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
@@ -161,6 +179,11 @@ public class TestHiveCatalogStore {
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, REGION));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.RCFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -178,7 +201,7 @@ public class TestHiveCatalogStore {
     KeyValueSet options = new KeyValueSet();
     options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava("\u0002"));
     options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava("\u0003"));
-    TableMeta meta = new TableMeta("TEXT", options);
+    TableMeta meta = new TableMeta(BuiltinStorages.TEXT, options);
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("s_suppkey", TajoDataTypes.Type.INT4);
@@ -195,6 +218,12 @@ public class TestHiveCatalogStore {
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, SUPPLIER));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.TEXTFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, SUPPLIER);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    //IgnoreKeyTextOutputFormat was deprecated
+    assertEquals(HiveIgnoreKeyTextOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, SUPPLIER));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -470,7 +499,7 @@ public class TestHiveCatalogStore {
 
   @Test
   public void testGetAllTableNames() throws Exception{
-    TableMeta meta = new TableMeta("TEXT", new KeyValueSet());
+    TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("n_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4);
@@ -498,7 +527,7 @@ public class TestHiveCatalogStore {
 
   @Test
   public void testDeleteTable() throws Exception {
-    TableMeta meta = new TableMeta("TEXT", new KeyValueSet());
+    TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("n_name", TajoDataTypes.Type.TEXT);
     schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4);
@@ -522,7 +551,7 @@ public class TestHiveCatalogStore {
   public void testTableUsingSequenceFileWithBinarySerde() throws Exception {
     KeyValueSet options = new KeyValueSet();
     options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE);
-    TableMeta meta = new TableMeta("SEQUENCEFILE", options);
+    TableMeta meta = new TableMeta(BuiltinStorages.SEQUENCE_FILE, options);
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
@@ -534,6 +563,11 @@ public class TestHiveCatalogStore {
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, REGION));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.SEQUENCEFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -551,7 +585,7 @@ public class TestHiveCatalogStore {
   public void testTableUsingSequenceFileWithTextSerde() throws Exception {
     KeyValueSet options = new KeyValueSet();
     options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE);
-    TableMeta meta = new TableMeta("SEQUENCEFILE", options);
+    TableMeta meta = new TableMeta(BuiltinStorages.SEQUENCE_FILE, options);
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4);
@@ -563,6 +597,11 @@ public class TestHiveCatalogStore {
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, REGION));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.SEQUENCEFILE);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -595,6 +634,11 @@ public class TestHiveCatalogStore {
     store.createTable(table.getProto());
     assertTrue(store.existTable(DB_NAME, CUSTOMER));
 
+    StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.PARQUET);
+    org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, CUSTOMER);
+    assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat());
+    assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat());
+
     TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER));
     assertEquals(table.getName(), table1.getName());
     assertEquals(table.getUri(), table1.getUri());
@@ -610,7 +654,7 @@ public class TestHiveCatalogStore {
   public void testDataTypeCompatibility() throws Exception {
     String tableName = CatalogUtil.normalizeIdentifier("testDataTypeCompatibility");
 
-    TableMeta meta = new TableMeta("TEXT", new KeyValueSet());
+    TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet());
 
     org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema();
     schema.addColumn("col1", TajoDataTypes.Type.INT4);

http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
----------------------------------------------------------------------
diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
index ba0c37b..bb053cc 100644
--- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
+++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java
@@ -85,7 +85,7 @@ public class StorageConstants {
   public static final String ORC_STRIPE_SIZE = "orc.stripe.size";
   public static final String DEFAULT_ORC_STRIPE_SIZE = "67108864"; // 64MB
 
-  public static final String ORC_COMPRESSION_KIND = "orc.compression.kind";
+  public static final String ORC_COMPRESSION = "orc.compress";
   public static final String ORC_COMPRESSION_KIND_NONE = "none";
   public static final String ORC_COMPRESSION_KIND_SNAPPY = "snappy";
   public static final String ORC_COMPRESSION_KIND_LZO = "lzo";

http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-dist/src/main/conf/tajo-env.cmd
----------------------------------------------------------------------
diff --git a/tajo-dist/src/main/conf/tajo-env.cmd b/tajo-dist/src/main/conf/tajo-env.cmd
index f005430..4040a4a 100644
--- a/tajo-dist/src/main/conf/tajo-env.cmd
+++ b/tajo-dist/src/main/conf/tajo-env.cmd
@@ -68,7 +68,7 @@ set JAVA_HOME=%JAVA_HOME%
 @rem Tajo cluster mode. the default mode is standby mode.
 set TAJO_WORKER_STANDBY_MODE=true
 
-@rem It must be required to use HCatalogStore
+@rem It must be required to use HiveCatalogStore
 @rem set HIVE_HOME=
 @rem set HIVE_JDBC_DRIVER_DIR=
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
index 4544ed3..dbbf5a6 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java
@@ -111,7 +111,7 @@ public class ORCAppender extends FileAppender {
   }
 
   private CompressionKind getCompressionKind() {
-    String kindstr = meta.getOption(StorageConstants.ORC_COMPRESSION_KIND, StorageConstants.DEFAULT_ORC_COMPRESSION_KIND);
+    String kindstr = meta.getOption(StorageConstants.ORC_COMPRESSION, StorageConstants.DEFAULT_ORC_COMPRESSION_KIND);
 
     if (kindstr.equalsIgnoreCase(StorageConstants.ORC_COMPRESSION_KIND_ZIP)) {
       return CompressionKind.ZLIB;


Mime
View raw message