Return-Path: X-Original-To: apmail-tajo-commits-archive@minotaur.apache.org Delivered-To: apmail-tajo-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 00E3A106E7 for ; Fri, 17 Apr 2015 08:33:29 +0000 (UTC) Received: (qmail 26433 invoked by uid 500); 17 Apr 2015 08:33:28 -0000 Delivered-To: apmail-tajo-commits-archive@tajo.apache.org Received: (qmail 26329 invoked by uid 500); 17 Apr 2015 08:33:28 -0000 Mailing-List: contact commits-help@tajo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tajo.apache.org Delivered-To: mailing list commits@tajo.apache.org Received: (qmail 26295 invoked by uid 99); 17 Apr 2015 08:33:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Apr 2015 08:33:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 99300DFC4F; Fri, 17 Apr 2015 08:33:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jihoonson@apache.org To: commits@tajo.apache.org Date: Fri, 17 Apr 2015 08:33:32 -0000 Message-Id: <28fbc20d2f9b4693a799876e72288ef2@git.apache.org> In-Reply-To: <68072732fe504e7b8491208d8045703f@git.apache.org> References: <68072732fe504e7b8491208d8045703f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/8] tajo git commit: TAJO-1442: Improve Hive Compatibility http://git-wip-us.apache.org/repos/asf/tajo/blob/955a7bf8/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java deleted file mode 100644 index 32ab674..0000000 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/test/java/org/apache/tajo/catalog/store/TestHCatalogStore.java +++ /dev/null @@ -1,467 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.catalog.store; - - -import org.apache.commons.lang.StringEscapeUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.partition.PartitionDesc; -import org.apache.tajo.catalog.partition.PartitionKey; -import org.apache.tajo.catalog.partition.PartitionMethodDesc; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.util.CommonTestingUtil; -import org.apache.tajo.util.KeyValueSet; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; -import static org.junit.Assert.*; - -/** - * TestHCatalogStore. Test case for - * {@link org.apache.tajo.catalog.store.HCatalogStore} - */ - -public class TestHCatalogStore { - private static final String DB_NAME = "test_hive"; - private static final String CUSTOMER = "customer"; - private static final String NATION = "nation"; - private static final String REGION = "region"; - private static final String SUPPLIER = "supplier"; - - private static HCatalogStore store; - private static Path warehousePath; - - @BeforeClass - public static void setUp() throws Exception { - Path testPath = CommonTestingUtil.getTestDir(); - warehousePath = new Path(testPath, "warehouse"); - - //create local hiveMeta - HiveConf conf = new HiveConf(); - String jdbcUri = "jdbc:derby:;databaseName="+testPath.toUri().getPath()+"metastore_db;create=true"; - conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousePath.toUri().toString()); - conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, jdbcUri); - conf.set(TajoConf.ConfVars.WAREHOUSE_DIR.varname, warehousePath.toUri().toString()); - - // create local HCatalogStore. - TajoConf tajoConf = new TajoConf(conf); - store = new HCatalogStore(tajoConf); - store.createDatabase(DB_NAME, null); - } - - @AfterClass - public static void tearDown() throws IOException { - store.close(); - } - - @Test - public void testTableUsingTextFile() throws Exception { - TableMeta meta = new TableMeta(CatalogProtos.StoreType.CSV, new KeyValueSet()); - - org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); - schema.addColumn("c_custkey", TajoDataTypes.Type.INT4); - schema.addColumn("c_name", TajoDataTypes.Type.TEXT); - schema.addColumn("c_address", TajoDataTypes.Type.TEXT); - schema.addColumn("c_nationkey", TajoDataTypes.Type.INT4); - schema.addColumn("c_phone", TajoDataTypes.Type.TEXT); - schema.addColumn("c_acctbal", TajoDataTypes.Type.FLOAT8); - schema.addColumn("c_mktsegment", TajoDataTypes.Type.TEXT); - schema.addColumn("c_comment", TajoDataTypes.Type.TEXT); - - TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, CUSTOMER), schema, meta, - new Path(warehousePath, new Path(DB_NAME, CUSTOMER)).toUri()); - store.createTable(table.getProto()); - assertTrue(store.existTable(DB_NAME, CUSTOMER)); - - TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER)); - assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); - assertEquals(table.getSchema().size(), table1.getSchema().size()); - for (int i = 0; i < table.getSchema().size(); i++) { - assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); - } - - assertEquals(StringEscapeUtils.escapeJava(StorageConstants.DEFAULT_FIELD_DELIMITER), - table1.getMeta().getOption(StorageConstants.TEXT_DELIMITER)); - store.dropTable(DB_NAME, CUSTOMER); - } - - @Test - public void testTableUsingRCFileWithBinarySerde() throws Exception { - KeyValueSet options = new KeyValueSet(); - options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); - TableMeta meta = new TableMeta(CatalogProtos.StoreType.RCFILE, options); - - org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); - schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4); - schema.addColumn("r_name", TajoDataTypes.Type.TEXT); - schema.addColumn("r_comment", TajoDataTypes.Type.TEXT); - - TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, REGION), schema, meta, - new Path(warehousePath, new Path(DB_NAME, REGION)).toUri()); - store.createTable(table.getProto()); - assertTrue(store.existTable(DB_NAME, REGION)); - - TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); - assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); - assertEquals(table.getSchema().size(), table1.getSchema().size()); - for (int i = 0; i < table.getSchema().size(); i++) { - assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); - } - - assertEquals(StorageConstants.DEFAULT_BINARY_SERDE, - table1.getMeta().getOption(StorageConstants.RCFILE_SERDE)); - store.dropTable(DB_NAME, REGION); - } - - @Test - public void testTableUsingRCFileWithTextSerde() throws Exception { - KeyValueSet options = new KeyValueSet(); - options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); - TableMeta meta = new TableMeta(CatalogProtos.StoreType.RCFILE, options); - - org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); - schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4); - schema.addColumn("r_name", TajoDataTypes.Type.TEXT); - schema.addColumn("r_comment", TajoDataTypes.Type.TEXT); - - TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, REGION), schema, meta, - new Path(warehousePath, new Path(DB_NAME, REGION)).toUri()); - store.createTable(table.getProto()); - assertTrue(store.existTable(DB_NAME, REGION)); - - TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); - assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); - assertEquals(table.getSchema().size(), table1.getSchema().size()); - for (int i = 0; i < table.getSchema().size(); i++) { - assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); - } - - assertEquals(StorageConstants.DEFAULT_TEXT_SERDE, table1.getMeta().getOption(StorageConstants.RCFILE_SERDE)); - store.dropTable(DB_NAME, REGION); - } - - @Test - public void testTableWithNullValue() throws Exception { - KeyValueSet options = new KeyValueSet(); - options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava("\u0002")); - options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava("\u0003")); - TableMeta meta = new TableMeta(CatalogProtos.StoreType.CSV, options); - - org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); - schema.addColumn("s_suppkey", TajoDataTypes.Type.INT4); - schema.addColumn("s_name", TajoDataTypes.Type.TEXT); - schema.addColumn("s_address", TajoDataTypes.Type.TEXT); - schema.addColumn("s_nationkey", TajoDataTypes.Type.INT4); - schema.addColumn("s_phone", TajoDataTypes.Type.TEXT); - schema.addColumn("s_acctbal", TajoDataTypes.Type.FLOAT8); - schema.addColumn("s_comment", TajoDataTypes.Type.TEXT); - - TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, SUPPLIER), schema, meta, - new Path(warehousePath, new Path(DB_NAME, SUPPLIER)).toUri()); - - store.createTable(table.getProto()); - assertTrue(store.existTable(DB_NAME, SUPPLIER)); - - TableDesc table1 = new TableDesc(store.getTable(DB_NAME, SUPPLIER)); - assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); - assertEquals(table.getSchema().size(), table1.getSchema().size()); - for (int i = 0; i < table.getSchema().size(); i++) { - assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); - } - - assertEquals(table.getMeta().getOption(StorageConstants.TEXT_DELIMITER), - table1.getMeta().getOption(StorageConstants.TEXT_DELIMITER)); - - assertEquals(table.getMeta().getOption(StorageConstants.TEXT_NULL), - table1.getMeta().getOption(StorageConstants.TEXT_NULL)); - - assertEquals(table1.getMeta().getOption(StorageConstants.TEXT_DELIMITER), - StringEscapeUtils.escapeJava("\u0002")); - - assertEquals(table1.getMeta().getOption(StorageConstants.TEXT_NULL), - StringEscapeUtils.escapeJava("\u0003")); - - store.dropTable(DB_NAME, SUPPLIER); - - } - - @Test - public void testAddTableByPartition() throws Exception { - TableMeta meta = new TableMeta(CatalogProtos.StoreType.CSV, new KeyValueSet()); - - org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); - schema.addColumn("n_name", TajoDataTypes.Type.TEXT); - schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4); - schema.addColumn("n_comment", TajoDataTypes.Type.TEXT); - - - TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, NATION), schema, meta, - new Path(warehousePath, new Path(DB_NAME, NATION)).toUri()); - - org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema(); - expressionSchema.addColumn("n_nationkey", TajoDataTypes.Type.INT4); - expressionSchema.addColumn("n_date", TajoDataTypes.Type.TEXT); - - PartitionMethodDesc partitions = new PartitionMethodDesc( - DB_NAME, - NATION, - CatalogProtos.PartitionType.COLUMN, "n_nationkey,n_date", expressionSchema); - table.setPartitionMethod(partitions); - - store.createTable(table.getProto()); - assertTrue(store.existTable(DB_NAME, NATION)); - - TableDesc table1 = new TableDesc(store.getTable(DB_NAME, NATION)); - assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); - assertEquals(table.getSchema().size(), table1.getSchema().size()); - for (int i = 0; i < table.getSchema().size(); i++) { - assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); - } - - Schema partitionSchema = table.getPartitionMethod().getExpressionSchema(); - Schema partitionSchema1 = table1.getPartitionMethod().getExpressionSchema(); - assertEquals(partitionSchema.size(), partitionSchema1.size()); - - for (int i = 0; i < partitionSchema.size(); i++) { - assertEquals(partitionSchema.getColumn(i).getSimpleName(), partitionSchema1.getColumn(i).getSimpleName()); - } - - testAddPartition(table1.getPath(), NATION, "n_nationkey=10/n_date=20150101"); - testAddPartition(table1.getPath(), NATION, "n_nationkey=20/n_date=20150102"); - - testDropPartition(NATION, "n_nationkey=10/n_date=20150101"); - testDropPartition(NATION, "n_nationkey=20/n_date=20150102"); - - CatalogProtos.PartitionDescProto partition = store.getPartition(DB_NAME, NATION, "n_nationkey=10/n_date=20150101"); - assertNull(partition); - - partition = store.getPartition(DB_NAME, NATION, "n_nationkey=20/n_date=20150102"); - assertNull(partition); - - store.dropTable(DB_NAME, NATION); - } - - - private void testAddPartition(URI uri, String tableName, String partitionName) throws Exception { - AlterTableDesc alterTableDesc = new AlterTableDesc(); - alterTableDesc.setTableName(DB_NAME + "." + tableName); - alterTableDesc.setAlterTableType(AlterTableType.ADD_PARTITION); - - Path path = new Path(uri.getPath(), partitionName); - - PartitionDesc partitionDesc = new PartitionDesc(); - partitionDesc.setPartitionName(partitionName); - - List partitionKeyList = new ArrayList(); - String[] partitionNames = partitionName.split("/"); - for(int i = 0; i < partitionNames.length; i++) { - String[] eachPartitionName = partitionNames[i].split("="); - partitionKeyList.add(new PartitionKey(eachPartitionName[0], eachPartitionName[1])); - } - partitionDesc.setPartitionKeys(partitionKeyList); - partitionDesc.setPath(path.toString()); - - alterTableDesc.setPartitionDesc(partitionDesc); - - store.alterTable(alterTableDesc.getProto()); - - CatalogProtos.PartitionDescProto resultDesc = store.getPartition(DB_NAME, NATION, partitionName); - assertNotNull(resultDesc); - assertEquals(resultDesc.getPartitionName(), partitionName); - assertEquals(resultDesc.getPath(), uri.toString() + "/" + partitionName); - assertEquals(resultDesc.getPartitionKeysCount(), 2); - - for (int i = 0; i < resultDesc.getPartitionKeysCount(); i++) { - CatalogProtos.PartitionKeyProto keyProto = resultDesc.getPartitionKeys(i); - String[] eachName = partitionNames[i].split("="); - assertEquals(keyProto.getPartitionValue(), eachName[1]); - } - } - - - private void testDropPartition(String tableName, String partitionName) throws Exception { - AlterTableDesc alterTableDesc = new AlterTableDesc(); - alterTableDesc.setTableName(DB_NAME + "." + tableName); - alterTableDesc.setAlterTableType(AlterTableType.DROP_PARTITION); - - PartitionDesc partitionDesc = new PartitionDesc(); - partitionDesc.setPartitionName(partitionName); - - alterTableDesc.setPartitionDesc(partitionDesc); - - store.alterTable(alterTableDesc.getProto()); - } - - @Test - public void testGetAllTableNames() throws Exception{ - TableMeta meta = new TableMeta(CatalogProtos.StoreType.CSV, new KeyValueSet()); - org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); - schema.addColumn("n_name", TajoDataTypes.Type.TEXT); - schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4); - schema.addColumn("n_comment", TajoDataTypes.Type.TEXT); - - String[] tableNames = new String[]{"table1", "table2", "table3"}; - - for(String tableName : tableNames){ - TableDesc table = new TableDesc(CatalogUtil.buildFQName("default", tableName), schema, meta, - new Path(warehousePath, new Path(DB_NAME, tableName)).toUri()); - store.createTable(table.getProto()); - } - - List tables = store.getAllTableNames("default"); - assertEquals(tableNames.length, tables.size()); - - for(String tableName : tableNames){ - assertTrue(tables.contains(tableName)); - } - - for(String tableName : tableNames){ - store.dropTable("default", tableName); - } - } - - @Test - public void testDeleteTable() throws Exception { - TableMeta meta = new TableMeta(CatalogProtos.StoreType.CSV, new KeyValueSet()); - org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); - schema.addColumn("n_name", TajoDataTypes.Type.TEXT); - schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4); - schema.addColumn("n_comment", TajoDataTypes.Type.TEXT); - - String tableName = "table1"; - TableDesc table = new TableDesc(DB_NAME + "." + tableName, schema, meta, warehousePath.toUri()); - store.createTable(table.getProto()); - assertTrue(store.existTable(DB_NAME, tableName)); - - TableDesc table1 = new TableDesc(store.getTable(DB_NAME, tableName)); - FileSystem fs = FileSystem.getLocal(new Configuration()); - assertTrue(fs.exists(new Path(table1.getPath()))); - - store.dropTable(DB_NAME, tableName); - assertFalse(store.existTable(DB_NAME, tableName)); - fs.close(); - } - - @Test - public void testTableUsingSequenceFileWithBinarySerde() throws Exception { - KeyValueSet options = new KeyValueSet(); - options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); - TableMeta meta = new TableMeta(CatalogProtos.StoreType.SEQUENCEFILE, options); - - org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); - schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4); - schema.addColumn("r_name", TajoDataTypes.Type.TEXT); - schema.addColumn("r_comment", TajoDataTypes.Type.TEXT); - - TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, REGION), schema, meta, - new Path(warehousePath, new Path(DB_NAME, REGION)).toUri()); - store.createTable(table.getProto()); - assertTrue(store.existTable(DB_NAME, REGION)); - - TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); - assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); - assertEquals(table.getSchema().size(), table1.getSchema().size()); - for (int i = 0; i < table.getSchema().size(); i++) { - assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); - } - - assertEquals(StorageConstants.DEFAULT_BINARY_SERDE, - table1.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE)); - store.dropTable(DB_NAME, REGION); - } - - @Test - public void testTableUsingSequenceFileWithTextSerde() throws Exception { - KeyValueSet options = new KeyValueSet(); - options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); - TableMeta meta = new TableMeta(CatalogProtos.StoreType.SEQUENCEFILE, options); - - org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); - schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4); - schema.addColumn("r_name", TajoDataTypes.Type.TEXT); - schema.addColumn("r_comment", TajoDataTypes.Type.TEXT); - - TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, REGION), schema, meta, - new Path(warehousePath, new Path(DB_NAME, REGION)).toUri()); - store.createTable(table.getProto()); - assertTrue(store.existTable(DB_NAME, REGION)); - - TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); - assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); - assertEquals(table.getSchema().size(), table1.getSchema().size()); - for (int i = 0; i < table.getSchema().size(); i++) { - assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); - } - - assertEquals(StorageConstants.DEFAULT_TEXT_SERDE, table1.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE)); - store.dropTable(DB_NAME, REGION); - } - - - @Test - public void testTableUsingParquet() throws Exception { - TableMeta meta = new TableMeta(CatalogProtos.StoreType.PARQUET, new KeyValueSet()); - - org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); - schema.addColumn("c_custkey", TajoDataTypes.Type.INT4); - schema.addColumn("c_name", TajoDataTypes.Type.TEXT); - schema.addColumn("c_address", TajoDataTypes.Type.TEXT); - schema.addColumn("c_nationkey", TajoDataTypes.Type.INT4); - schema.addColumn("c_phone", TajoDataTypes.Type.TEXT); - schema.addColumn("c_acctbal", TajoDataTypes.Type.FLOAT8); - schema.addColumn("c_mktsegment", TajoDataTypes.Type.TEXT); - schema.addColumn("c_comment", TajoDataTypes.Type.TEXT); - - TableDesc table = new TableDesc(CatalogUtil.buildFQName(DB_NAME, CUSTOMER), schema, meta, - new Path(warehousePath, new Path(DB_NAME, CUSTOMER)).toUri()); - store.createTable(table.getProto()); - assertTrue(store.existTable(DB_NAME, CUSTOMER)); - - TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER)); - assertEquals(table.getName(), table1.getName()); - assertEquals(table.getPath(), table1.getPath()); - assertEquals(table.getSchema().size(), table1.getSchema().size()); - for (int i = 0; i < table.getSchema().size(); i++) { - assertEquals(table.getSchema().getColumn(i).getSimpleName(), table1.getSchema().getColumn(i).getSimpleName()); - } - - store.dropTable(DB_NAME, CUSTOMER); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/955a7bf8/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml b/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml new file mode 100644 index 0000000..16cb170 --- /dev/null +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml @@ -0,0 +1,351 @@ + + + + + + tajo-project + org.apache.tajo + 0.11.0-SNAPSHOT + ../../../tajo-project + + 4.0.0 + tajo-hive + jar + Tajo Catalog Drivers Hive + + UTF-8 + UTF-8 + 1.5.0 + 2.1.0 + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + ${project.build.sourceEncoding} + + + + org.apache.rat + apache-rat-plugin + + + verify + + check + + + + + + derby.log + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + prepare-package + + copy-dependencies + + + runtime + ${project.build.directory}/lib + false + false + true + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + + + + + + + org.apache.tajo + tajo-common + + + org.apache.tajo + tajo-catalog-common + + + org.apache.tajo + tajo-catalog-client + + + org.apache.tajo + tajo-catalog-server + + + org.apache.tajo + tajo-rpc-protobuf + + + org.apache.tajo + tajo-storage-common + + + junit + junit + test + + + org.apache.thrift + libfb303 + 0.9.0 + provided + + + org.apache.thrift + libthrift + 0.9.0 + provided + + + org.apache.hadoop + hadoop-mapreduce-client-core + ${hadoop.version} + provided + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + provided + + + org.apache.hive + hive-exec + ${hive.version} + provided + + + org.apache.hive + hive-common + + + org.apache.hive + hive-contrib + + + org.apache.hive + hive-hbase-handler + + + org.apache.hive + hive-metastore + + + org.apache.hive + hive-serde + + + org.apache.hive + hive-shims + + + org.apache.hive + hive-testutils + + + org.apache.thrift + libfb303 + + + org.apache.thrift + libthrift + + + com.jolbox + bonecp + + + com.google.protobuf + protobuf-java + + + org.apache.calcite + calcite-core + + + org.apache.calcite + calcite-avatica + + + + + org.apache.hive + hive-metastore + ${hive.version} + provided + + + org.apache.hive + hive-common + + + org.apache.hive + hive-serde + + + org.apache.hive + hive-shimss + + + org.apache.thrift + libfb303 + + + org.apache.thrift + libthrift + + + com.jolbox + bonecp + + + + + org.apache.hive + hive-cli + ${hive.version} + provided + + + org.apache.hive + hive-common + + + org.apache.hive + hive-exec + + + org.apache.hive + hive-metastore + + + org.apache.hive + hive-serde + + + org.apache.hive + hive-service + + + org.apache.hive + hive-shims + + + com.jolbox + bonecp + + + jline + jline + + + + + com.twitter + parquet-hive-bundle + ${parquet.version} + + + + + + docs + + false + + + + + org.apache.maven.plugins + maven-javadoc-plugin + + + + module-javadocs + package + + jar + + + ${project.build.directory} + + + + + + + + + src + + false + + + + + org.apache.maven.plugins + maven-source-plugin + + + + hadoop-java-sources + package + + jar-no-fork + + + + + + + + + + + + + org.apache.maven.plugins + maven-surefire-report-plugin + + + + + http://git-wip-us.apache.org/repos/asf/tajo/blob/955a7bf8/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java new file mode 100644 index 0000000..5b1a996 --- /dev/null +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -0,0 +1,964 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.catalog.store; + +import com.google.common.collect.Lists; + +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.TableType; +import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.exception.*; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; +import org.apache.tajo.catalog.proto.CatalogProtos.DatabaseProto; +import org.apache.tajo.catalog.proto.CatalogProtos.IndexProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableDescriptorProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableOptionProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TableStatsProto; +import org.apache.tajo.catalog.proto.CatalogProtos.TablespaceProto; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.common.exception.NotImplementedException; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.exception.InternalException; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.KeyValueSet; +import org.apache.thrift.TException; + +import java.io.IOException; +import java.util.*; + +import static org.apache.tajo.catalog.proto.CatalogProtos.PartitionType; + +public class HiveCatalogStore extends CatalogConstants implements CatalogStore { + protected final Log LOG = LogFactory.getLog(getClass()); + + private static String HIVE_WAREHOUSE_DIR_CONF_KEY = "hive.metastore.warehouse.dir"; + + protected Configuration conf; + private static final int CLIENT_POOL_SIZE = 2; + private final HiveCatalogStoreClientPool clientPool; + private final String defaultTableSpaceUri; + + public HiveCatalogStore(final Configuration conf) throws InternalException { + if (!(conf instanceof TajoConf)) { + throw new CatalogException("Invalid Configuration Type:" + conf.getClass().getSimpleName()); + } + this.conf = conf; + this.defaultTableSpaceUri = TajoConf.getWarehouseDir((TajoConf) conf).toString(); + this.clientPool = new HiveCatalogStoreClientPool(CLIENT_POOL_SIZE, conf); + } + + @Override + public boolean existTable(final String databaseName, final String tableName) throws CatalogException { + boolean exist = false; + org.apache.hadoop.hive.ql.metadata.Table table; + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + // get table + try { + client = clientPool.getClient(); + table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); + if (table != null) { + exist = true; + } + } catch (NoSuchObjectException nsoe) { + exist = false; + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + + return exist; + } + + @Override + public final CatalogProtos.TableDescProto getTable(String databaseName, final String tableName) throws CatalogException { + org.apache.hadoop.hive.ql.metadata.Table table = null; + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + Path path = null; + CatalogProtos.StoreType storeType = null; + org.apache.tajo.catalog.Schema schema = null; + KeyValueSet options = null; + TableStats stats = null; + PartitionMethodDesc partitions = null; + + ////////////////////////////////// + // set tajo table schema. + ////////////////////////////////// + try { + // get hive table schema + try { + client = clientPool.getClient(); + table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); + path = table.getPath(); + } catch (NoSuchObjectException nsoe) { + throw new CatalogException("Table not found. - tableName:" + tableName, nsoe); + } catch (Exception e) { + throw new CatalogException(e); + } + + // convert HiveCatalogStore field schema into tajo field schema. + schema = new org.apache.tajo.catalog.Schema(); + + List fieldSchemaList = table.getCols(); + boolean isPartitionKey = false; + for (FieldSchema eachField : fieldSchemaList) { + isPartitionKey = false; + + if (table.getPartitionKeys() != null) { + for (FieldSchema partitionKey : table.getPartitionKeys()) { + if (partitionKey.getName().equals(eachField.getName())) { + isPartitionKey = true; + } + } + } + + if (!isPartitionKey) { + String fieldName = databaseName + CatalogConstants.IDENTIFIER_DELIMITER + tableName + + CatalogConstants.IDENTIFIER_DELIMITER + eachField.getName(); + TajoDataTypes.Type dataType = HiveCatalogUtil.getTajoFieldType(eachField.getType().toString()); + schema.addColumn(fieldName, dataType); + } + } + + // validate field schema. + HiveCatalogUtil.validateSchema(table); + + stats = new TableStats(); + options = new KeyValueSet(); + options.putAll(table.getParameters()); + options.remove("EXTERNAL"); + + Properties properties = table.getMetadata(); + if (properties != null) { + // set field delimiter + String fieldDelimiter = "", nullFormat = ""; + if (properties.getProperty(serdeConstants.FIELD_DELIM) != null) { + fieldDelimiter = properties.getProperty(serdeConstants.FIELD_DELIM); + } else { + // if hive table used default row format delimiter, Properties doesn't have it. + // So, Tajo must set as follows: + fieldDelimiter = "\u0001"; + } + + // set null format + if (properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT) != null) { + nullFormat = properties.getProperty(serdeConstants.SERIALIZATION_NULL_FORMAT); + } else { + nullFormat = "\\N"; + } + options.remove(serdeConstants.SERIALIZATION_NULL_FORMAT); + + // set file output format + String fileOutputformat = properties.getProperty(hive_metastoreConstants.FILE_OUTPUT_FORMAT); + storeType = CatalogUtil.getStoreType(HiveCatalogUtil.getStoreType(fileOutputformat)); + + if (storeType.equals(CatalogProtos.StoreType.TEXTFILE)) { + options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter)); + options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava(nullFormat)); + } else if (storeType.equals(CatalogProtos.StoreType.RCFILE)) { + options.set(StorageConstants.RCFILE_NULL, StringEscapeUtils.escapeJava(nullFormat)); + String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB); + if (LazyBinaryColumnarSerDe.class.getName().equals(serde)) { + options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); + } else if (ColumnarSerDe.class.getName().equals(serde)) { + options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); + } + } else if (storeType.equals(CatalogProtos.StoreType.SEQUENCEFILE) ) { + options.set(StorageConstants.SEQUENCEFILE_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter)); + options.set(StorageConstants.SEQUENCEFILE_NULL, StringEscapeUtils.escapeJava(nullFormat)); + String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB); + if (LazyBinarySerDe.class.getName().equals(serde)) { + options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); + } else if (LazySimpleSerDe.class.getName().equals(serde)) { + options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); + } + } + + // set data size + long totalSize = 0; + if (properties.getProperty("totalSize") != null) { + totalSize = Long.parseLong(properties.getProperty("totalSize")); + } else { + try { + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path)) { + totalSize = fs.getContentSummary(path).getLength(); + } + } catch (IOException ioe) { + throw new CatalogException("Fail to get path. - path:" + path.toString(), ioe); + } + } + stats.setNumBytes(totalSize); + } + + // set partition keys + List partitionKeys = table.getPartitionKeys(); + + if (null != partitionKeys) { + org.apache.tajo.catalog.Schema expressionSchema = new org.apache.tajo.catalog.Schema(); + StringBuilder sb = new StringBuilder(); + if (partitionKeys.size() > 0) { + for (int i = 0; i < partitionKeys.size(); i++) { + FieldSchema fieldSchema = partitionKeys.get(i); + TajoDataTypes.Type dataType = HiveCatalogUtil.getTajoFieldType(fieldSchema.getType().toString()); + String fieldName = databaseName + CatalogConstants.IDENTIFIER_DELIMITER + tableName + + CatalogConstants.IDENTIFIER_DELIMITER + fieldSchema.getName(); + expressionSchema.addColumn(new Column(fieldName, dataType)); + if (i > 0) { + sb.append(","); + } + sb.append(fieldSchema.getName()); + } + partitions = new PartitionMethodDesc( + databaseName, + tableName, + PartitionType.COLUMN, + sb.toString(), + expressionSchema); + } + } + } finally { + if(client != null) client.release(); + } + TableMeta meta = new TableMeta(storeType, options); + TableDesc tableDesc = new TableDesc(databaseName + "." + tableName, schema, meta, path.toUri()); + if (table.getTableType().equals(TableType.EXTERNAL_TABLE)) { + tableDesc.setExternal(true); + } + if (stats != null) { + tableDesc.setStats(stats); + } + if (partitions != null) { + tableDesc.setPartitionMethod(partitions); + } + return tableDesc.getProto(); + } + + + private TajoDataTypes.Type getDataType(final String typeStr) { + try { + return Enum.valueOf(TajoDataTypes.Type.class, typeStr); + } catch (IllegalArgumentException iae) { + LOG.error("Cannot find a matched type against from '" + typeStr + "'"); + return null; + } + } + + @Override + public final List getAllTableNames(String databaseName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + return client.getHiveClient().getAllTables(databaseName); + } catch (TException e) { + throw new CatalogException(e); + } finally { + if(client != null) client.release(); + } + } + + @Override + public void createTablespace(String spaceName, String spaceUri) throws CatalogException { + // SKIP + } + + @Override + public boolean existTablespace(String spaceName) throws CatalogException { + // SKIP + return spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME); + } + + @Override + public void dropTablespace(String spaceName) throws CatalogException { + // SKIP + } + + @Override + public Collection getAllTablespaceNames() throws CatalogException { + return Lists.newArrayList(TajoConstants.DEFAULT_TABLESPACE_NAME); + } + + @Override + public TablespaceProto getTablespace(String spaceName) throws CatalogException { + if (spaceName.equals(TajoConstants.DEFAULT_TABLESPACE_NAME)) { + TablespaceProto.Builder builder = TablespaceProto.newBuilder(); + builder.setSpaceName(TajoConstants.DEFAULT_TABLESPACE_NAME); + builder.setUri(defaultTableSpaceUri); + return builder.build(); + } else { + throw new CatalogException("tablespace concept is not supported in HiveCatalogStore"); + } + } + + @Override + public void updateTableStats(CatalogProtos.UpdateTableStatsProto statsProto) throws + CatalogException { + // TODO - not implemented yet + } + + @Override + public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) throws CatalogException { + throw new CatalogException("tablespace concept is not supported in HiveCatalogStore"); + } + + @Override + public void createDatabase(String databaseName, String tablespaceName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + Database database = new Database( + databaseName, + "", + defaultTableSpaceUri + "/" + databaseName, + new HashMap()); + client = clientPool.getClient(); + client.getHiveClient().createDatabase(database); + } catch (AlreadyExistsException e) { + throw new AlreadyExistsDatabaseException(databaseName); + } catch (Throwable t) { + throw new CatalogException(t); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public boolean existDatabase(String databaseName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + List databaseNames = client.getHiveClient().getAllDatabases(); + return databaseNames.contains(databaseName); + } catch (Throwable t) { + throw new CatalogException(t); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public void dropDatabase(String databaseName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + client.getHiveClient().dropDatabase(databaseName); + } catch (NoSuchObjectException e) { + throw new NoSuchDatabaseException(databaseName); + } catch (Throwable t) { + throw new CatalogException(databaseName); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public Collection getAllDatabaseNames() throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + return client.getHiveClient().getAllDatabases(); + } catch (TException e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public final void createTable(final CatalogProtos.TableDescProto tableDescProto) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + TableDesc tableDesc = new TableDesc(tableDescProto); + String[] splitted = CatalogUtil.splitFQTableName(tableDesc.getName()); + String databaseName = splitted[0]; + String tableName = splitted[1]; + + try { + client = clientPool.getClient(); + + org.apache.hadoop.hive.metastore.api.Table table = new org.apache.hadoop.hive.metastore.api.Table(); + table.setDbName(databaseName); + table.setTableName(tableName); + table.setParameters(new HashMap(tableDesc.getMeta().getOptions().getAllKeyValus())); + // TODO: set owner + //table.setOwner(); + + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo()); + sd.getSerdeInfo().setParameters(new HashMap()); + sd.getSerdeInfo().setName(table.getTableName()); + + // if tajo set location method, thrift client make exception as follows: + // Caused by: MetaException(message:java.lang.NullPointerException) + // If you want to modify table path, you have to modify on Hive cli. + if (tableDesc.isExternal()) { + table.setTableType(TableType.EXTERNAL_TABLE.name()); + table.putToParameters("EXTERNAL", "TRUE"); + + Path tablePath = new Path(tableDesc.getPath()); + FileSystem fs = tablePath.getFileSystem(conf); + if (fs.isFile(tablePath)) { + LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path."); + sd.setLocation(tablePath.getParent().toString()); + } else { + sd.setLocation(tablePath.toString()); + } + } + + // set column information + List columns = tableDesc.getSchema().getColumns(); + ArrayList cols = new ArrayList(columns.size()); + + for (Column eachField : columns) { + cols.add(new FieldSchema(eachField.getSimpleName(), + HiveCatalogUtil.getHiveFieldType(eachField.getDataType()), "")); + } + sd.setCols(cols); + + // set partition keys + if (tableDesc.hasPartition() && tableDesc.getPartitionMethod().getPartitionType().equals(PartitionType.COLUMN)) { + List partitionKeys = new ArrayList(); + for (Column eachPartitionKey : tableDesc.getPartitionMethod().getExpressionSchema().getColumns()) { + partitionKeys.add(new FieldSchema(eachPartitionKey.getSimpleName(), + HiveCatalogUtil.getHiveFieldType(eachPartitionKey.getDataType()), "")); + } + table.setPartitionKeys(partitionKeys); + } + + if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.RCFILE)) { + String serde = tableDesc.getMeta().getOption(StorageConstants.RCFILE_SERDE); + sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName()); + if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName()); + } else { + sd.getSerdeInfo().setSerializationLib( + org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe.class.getName()); + } + + if (tableDesc.getMeta().getOptions().containsKey(StorageConstants.RCFILE_NULL)) { + table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.RCFILE_NULL))); + } + } else if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.CSV) + || tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.TEXTFILE)) { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); + sd.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName()); + + String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.TEXT_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER); + + // User can use an unicode for filed delimiter such as \u0001, \001. + // In this case, java console will convert this value into "\\u001". + // And hive will un-espace this value again. + // As a result, user can use right field delimiter. + // So, we have to un-escape this value. + sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + table.getParameters().remove(StorageConstants.TEXT_DELIMITER); + + if (tableDesc.getMeta().containsOption(StorageConstants.TEXT_NULL)) { + table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.TEXT_NULL))); + table.getParameters().remove(StorageConstants.TEXT_NULL); + } + } else if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.SEQUENCEFILE)) { + String serde = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE); + sd.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName()); + sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.class.getName()); + + if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); + + String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_DELIMITER, + StorageConstants.DEFAULT_FIELD_DELIMITER); + + // User can use an unicode for filed delimiter such as \u0001, \001. + // In this case, java console will convert this value into "\\u001". + // And hive will un-espace this value again. + // As a result, user can use right field delimiter. + // So, we have to un-escape this value. + sd.getSerdeInfo().putToParameters(serdeConstants.SERIALIZATION_FORMAT, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + sd.getSerdeInfo().putToParameters(serdeConstants.FIELD_DELIM, + StringEscapeUtils.unescapeJava(fieldDelimiter)); + table.getParameters().remove(StorageConstants.SEQUENCEFILE_DELIMITER); + } else { + sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class.getName()); + } + + if (tableDesc.getMeta().containsOption(StorageConstants.SEQUENCEFILE_NULL)) { + table.putToParameters(serdeConstants.SERIALIZATION_NULL_FORMAT, + StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_NULL))); + table.getParameters().remove(StorageConstants.SEQUENCEFILE_NULL); + } + } else { + if (tableDesc.getMeta().getStoreType().equals(CatalogProtos.StoreType.PARQUET)) { + sd.setInputFormat(parquet.hive.DeprecatedParquetInputFormat.class.getName()); + sd.setOutputFormat(parquet.hive.DeprecatedParquetOutputFormat.class.getName()); + sd.getSerdeInfo().setSerializationLib(parquet.hive.serde.ParquetHiveSerDe.class.getName()); + } else { + throw new CatalogException(new NotImplementedException(tableDesc.getMeta().getStoreType + ().name())); + } + } + + sd.setSortCols(new ArrayList()); + + table.setSd(sd); + client.getHiveClient().createTable(table); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if(client != null) client.release(); + } + } + + @Override + public final void dropTable(String databaseName, final String tableName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + client = clientPool.getClient(); + client.getHiveClient().dropTable(databaseName, tableName, false, false); + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + + @Override + public void alterTable(final CatalogProtos.AlterTableDescProto alterTableDescProto) throws CatalogException { + final String[] split = CatalogUtil.splitFQTableName(alterTableDescProto.getTableName()); + + if (split.length == 1) { + throw new IllegalArgumentException("alterTable() requires a qualified table name, but it is \"" + + alterTableDescProto.getTableName() + "\"."); + } + + final String databaseName = split[0]; + final String tableName = split[1]; + String partitionName = null; + CatalogProtos.PartitionDescProto partitionDesc = null; + + switch (alterTableDescProto.getAlterTableType()) { + case RENAME_TABLE: + if (existTable(databaseName,alterTableDescProto.getNewTableName().toLowerCase())) { + throw new AlreadyExistsTableException(alterTableDescProto.getNewTableName()); + } + renameTable(databaseName, tableName, alterTableDescProto.getNewTableName().toLowerCase()); + break; + case RENAME_COLUMN: + if (existColumn(databaseName,tableName, alterTableDescProto.getAlterColumnName().getNewColumnName())) { + throw new ColumnNameAlreadyExistException(alterTableDescProto.getAlterColumnName().getNewColumnName()); + } + renameColumn(databaseName, tableName, alterTableDescProto.getAlterColumnName()); + break; + case ADD_COLUMN: + if (existColumn(databaseName,tableName, alterTableDescProto.getAddColumn().getName())) { + throw new ColumnNameAlreadyExistException(alterTableDescProto.getAddColumn().getName()); + } + addNewColumn(databaseName, tableName, alterTableDescProto.getAddColumn()); + break; + case ADD_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc != null) { + throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + } + addPartition(databaseName, tableName, alterTableDescProto.getPartitionDesc()); + break; + case DROP_PARTITION: + partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); + partitionDesc = getPartition(databaseName, tableName, partitionName); + if(partitionDesc == null) { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } + dropPartition(databaseName, tableName, partitionDesc); + break; + case SET_PROPERTY: + // TODO - not implemented yet + break; + default: + //TODO + } + } + + + private void renameTable(String databaseName, String tableName, String newTableName) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + client = clientPool.getClient(); + Table newTable = client.getHiveClient().getTable(databaseName, tableName); + newTable.setTableName(newTableName); + client.getHiveClient().alter_table(databaseName, tableName, newTable); + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + private void renameColumn(String databaseName, String tableName, CatalogProtos.AlterColumnProto alterColumnProto) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + Table table = client.getHiveClient().getTable(databaseName, tableName); + List columns = table.getSd().getCols(); + + for (final FieldSchema currentColumn : columns) { + if (currentColumn.getName().equalsIgnoreCase(alterColumnProto.getOldColumnName())) { + currentColumn.setName(alterColumnProto.getNewColumnName()); + } + } + client.getHiveClient().alter_table(databaseName, tableName, table); + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + + private void addNewColumn(String databaseName, String tableName, CatalogProtos.ColumnProto columnProto) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + Table table = client.getHiveClient().getTable(databaseName, tableName); + List columns = table.getSd().getCols(); + columns.add(new FieldSchema(columnProto.getName(), + HiveCatalogUtil.getHiveFieldType(columnProto.getDataType()), "")); + client.getHiveClient().alter_table(databaseName, tableName, table); + + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + private void addPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto + partitionDescProto) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + + Partition partition = new Partition(); + partition.setDbName(databaseName); + partition.setTableName(tableName); + + List values = Lists.newArrayList(); + for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { + values.add(keyProto.getPartitionValue()); + } + partition.setValues(values); + + Table table = client.getHiveClient().getTable(databaseName, tableName); + StorageDescriptor sd = table.getSd(); + sd.setLocation(partitionDescProto.getPath()); + partition.setSd(sd); + + client.getHiveClient().add_partition(partition); + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + private void dropPartition(String databaseName, String tableName, CatalogProtos.PartitionDescProto + partitionDescProto) { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + + client = clientPool.getClient(); + + List values = Lists.newArrayList(); + for(CatalogProtos.PartitionKeyProto keyProto : partitionDescProto.getPartitionKeysList()) { + values.add(keyProto.getPartitionValue()); + } + client.getHiveClient().dropPartition(databaseName, tableName, values, true); + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + } + + @Override + public void addPartitionMethod(CatalogProtos.PartitionMethodProto partitionMethodProto) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public CatalogProtos.PartitionMethodProto getPartitionMethod(String databaseName, String tableName) + throws CatalogException { + return null; // TODO - not implemented yet + } + + @Override + public boolean existPartitionMethod(String databaseName, String tableName) throws CatalogException { + return false; // TODO - not implemented yet + } + + @Override + public void dropPartitionMethod(String databaseName, String tableName) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public List getPartitions(String databaseName, + String tableName) throws CatalogException { + throw new UnsupportedOperationException(); + } + + + @Override + public CatalogProtos.PartitionDescProto getPartition(String databaseName, String tableName, + String partitionName) throws CatalogException { + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + CatalogProtos.PartitionDescProto.Builder builder = null; + + try { + client = clientPool.getClient(); + + Partition partition = client.getHiveClient().getPartition(databaseName, tableName, partitionName); + builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPartitionName(partitionName); + builder.setPath(partition.getSd().getLocation()); + + String[] partitionNames = partitionName.split("/"); + + for (int i = 0; i < partition.getValues().size(); i++) { + String value = partition.getValues().get(i); + CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder(); + + String columnName = partitionNames[i].split("=")[0]; + keyBuilder.setColumnName(columnName); + keyBuilder.setPartitionValue(value); + builder.addPartitionKeys(keyBuilder); + } + } catch (NoSuchObjectException e) { + return null; + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + return builder.build(); + } + + @Override + public final void addFunction(final FunctionDesc func) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public final void deleteFunction(final FunctionDesc func) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public final void existFunction(final FunctionDesc func) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public final List getAllFunctionNames() throws CatalogException { + // TODO - not implemented yet + return null; + } + + @Override + public void dropIndex(String databaseName, String indexName) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public boolean existIndexByName(String databaseName, String indexName) throws CatalogException { + // TODO - not implemented yet + return false; + } + + @Override + public CatalogProtos.IndexDescProto[] getIndexes(String databaseName, String tableName) throws CatalogException { + // TODO - not implemented yet + return null; + } + + @Override + public void createIndex(CatalogProtos.IndexDescProto proto) throws CatalogException { + // TODO - not implemented yet + } + + @Override + public CatalogProtos.IndexDescProto getIndexByName(String databaseName, String indexName) throws CatalogException { + // TODO - not implemented yet + return null; + } + + @Override + public CatalogProtos.IndexDescProto getIndexByColumn(String databaseName, String tableName, String columnName) + throws CatalogException { + // TODO - not implemented yet + return null; + } + + @Override + public boolean existIndexByColumn(String databaseName, String tableName, String columnName) throws CatalogException { + // TODO - not implemented yet + return false; + } + + @Override + public final void close() { + clientPool.close(); + } + + private boolean existColumn(final String databaseName ,final String tableName , final String columnName) throws CatalogException { + boolean exist = false; + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + + try { + + client = clientPool.getClient(); + Table table = client.getHiveClient().getTable(databaseName, tableName); + List columns = table.getSd().getCols(); + + for (final FieldSchema currentColumn : columns) { + if (currentColumn.getName().equalsIgnoreCase(columnName)) { + exist = true; + } + } + client.getHiveClient().alter_table(databaseName, tableName, table); + + } catch (NoSuchObjectException nsoe) { + } catch (Exception e) { + throw new CatalogException(e); + } finally { + if (client != null) { + client.release(); + } + } + + return exist; + } + + @Override + public List getAllColumns() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List getAllDatabases() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List getAllIndexes() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List getAllPartitions() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List getAllTableOptions() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List getAllTableStats() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List getAllTables() throws CatalogException { + throw new UnsupportedOperationException(); + } + + @Override + public List getTablespaces() throws CatalogException { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/955a7bf8/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStoreClientPool.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStoreClientPool.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStoreClientPool.java new file mode 100644 index 0000000..9053c56 --- /dev/null +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStoreClientPool.java @@ -0,0 +1,170 @@ +// Copyright 2012 Cloudera Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package org.apache.tajo.catalog.store; + + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.*; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.log4j.Logger; + +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Manages a pool of HiveMetaStoreClient connections. If the connection pool is empty + * a new client is created and added to the pool. There is no size limit. + */ +public class HiveCatalogStoreClientPool { + private static final Logger LOG = Logger.getLogger(HiveCatalogStoreClientPool.class); + private final ConcurrentLinkedQueue clientPool = + new ConcurrentLinkedQueue(); + private AtomicBoolean poolClosed = new AtomicBoolean(false); + private HiveConf hiveConf; + + /** + * A wrapper around the HiveMetaStoreClient that manages interactions with the + * connection pool. + */ + public class HiveCatalogStoreClient { + private final IMetaStoreClient hiveClient; + public AtomicBoolean isInUse = new AtomicBoolean(false); + + private HiveCatalogStoreClient(HiveConf hiveConf) { + try { + HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() { + @Override + public HiveMetaHook getHook(Table table) throws MetaException { + /* metadata hook implementation, or null if this + * storage handler does not need any metadata notifications + */ + return null; + } + }; + + this.hiveClient = RetryingMetaStoreClient.getProxy(hiveConf, hookLoader, HiveMetaStoreClient.class.getName()); + clientPool.add(this); + LOG.info("MetaStoreClient created (size = " + clientPool.size() + ")"); + } catch (Exception e) { + // Turn in to an unchecked exception + throw new IllegalStateException(e); + } + } + + /** + * Returns the internal HiveMetaStoreClient object. + */ + public IMetaStoreClient getHiveClient() { + return hiveClient; + } + + /** + * Returns this client back to the connection pool. If the connection pool has been + * closed, just close the Hive client connection. + */ + public synchronized void release() { + if(!this.isInUse.getAndSet(false)){ + return; + } + // Ensure the connection isn't returned to the pool if the pool has been closed. + // This lock is needed to ensure proper behavior when a thread reads poolClosed + // is false, but a call to pool.close() comes in immediately afterward. + if (poolClosed.get()) { + this.getHiveClient().close(); + } else { + clientPool.add(this); + } + } + + // Marks this client as in use + private void markInUse() { + isInUse.set(true); + } + } + + public HiveCatalogStoreClientPool(int initialSize) { + this(initialSize, new HiveConf(HiveCatalogStoreClientPool.class)); + } + + public HiveCatalogStoreClientPool(int initialSize, HiveConf hiveConf) { + this.hiveConf = hiveConf; + addClients(initialSize); + } + + public HiveCatalogStoreClientPool(int initialSize, Configuration conf) { + this.hiveConf = new HiveConf(); + setParameters(conf); + addClients(initialSize); + } + + public void setParameters(Configuration conf) { + for( Iterator> iter = conf.iterator(); iter.hasNext();) { + Map.Entry entry = iter.next(); + this.hiveConf.set(entry.getKey(), entry.getValue()); + } + } + + /** + * Add numClients to the client pool. + */ + public void addClients(int numClients) { + for (int i = 0; i < numClients; ++i) { + clientPool.add(new HiveCatalogStoreClient(hiveConf)); + } + } + + /** + * Gets a client from the pool. If the pool is empty a new client is created. + */ + public synchronized HiveCatalogStoreClient getClient() { + // The MetaStoreClient c'tor relies on knowing the Hadoop version by asking + // org.apache.hadoop.util.VersionInfo. The VersionInfo class relies on opening + // the 'common-version-info.properties' file as a resource from hadoop-common*.jar + // using the Thread's context classloader. If necessary, set the Thread's context + // classloader, otherwise VersionInfo will fail in it's c'tor. + if (Thread.currentThread().getContextClassLoader() == null) { + Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader()); + } + + HiveCatalogStoreClient client = clientPool.poll(); + // The pool was empty so create a new client and return that. + if (client == null) { + client = new HiveCatalogStoreClient(hiveConf); + } + client.markInUse(); + + return client; + } + + /** + * Removes all items from the connection pool and closes all Hive Meta Store client + * connections. Can be called multiple times. + */ + public void close() { + // Ensure no more items get added to the pool once close is called. + if (poolClosed.getAndSet(true)) { + return; + } + + HiveCatalogStoreClient client = null; + while ((client = clientPool.poll()) != null) { + client.getHiveClient().close(); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/955a7bf8/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java new file mode 100644 index 0000000..59910b8 --- /dev/null +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.catalog.store; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; +import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.tajo.catalog.exception.CatalogException; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.thrift.TException; +import parquet.hadoop.mapred.DeprecatedParquetOutputFormat; + +public class HiveCatalogUtil { + public static void validateSchema(Table tblSchema) throws CatalogException { + for (FieldSchema fieldSchema : tblSchema.getCols()) { + String fieldType = fieldSchema.getType(); + if (fieldType.equalsIgnoreCase("ARRAY") || fieldType.equalsIgnoreCase("STRUCT") + || fieldType.equalsIgnoreCase("MAP")) { + throw new CatalogException("Unsupported field type :" + fieldType.toUpperCase()); + } + } + } + + public static TajoDataTypes.Type getTajoFieldType(String fieldType) { + Preconditions.checkNotNull(fieldType); + + if(fieldType.equalsIgnoreCase(serdeConstants.INT_TYPE_NAME)) { + return TajoDataTypes.Type.INT4; + } else if(fieldType.equalsIgnoreCase(serdeConstants.TINYINT_TYPE_NAME)) { + return TajoDataTypes.Type.INT1; + } else if(fieldType.equalsIgnoreCase(serdeConstants.SMALLINT_TYPE_NAME)) { + return TajoDataTypes.Type.INT2; + } else if(fieldType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)) { + return TajoDataTypes.Type.INT8; + } else if(fieldType.equalsIgnoreCase(serdeConstants.BOOLEAN_TYPE_NAME)) { + return TajoDataTypes.Type.BOOLEAN; + } else if(fieldType.equalsIgnoreCase(serdeConstants.FLOAT_TYPE_NAME)) { + return TajoDataTypes.Type.FLOAT4; + } else if(fieldType.equalsIgnoreCase(serdeConstants.DOUBLE_TYPE_NAME)) { + return TajoDataTypes.Type.FLOAT8; + } else if(fieldType.equalsIgnoreCase(serdeConstants.STRING_TYPE_NAME)) { + return TajoDataTypes.Type.TEXT; + } else if(fieldType.equalsIgnoreCase(serdeConstants.BINARY_TYPE_NAME)) { + return TajoDataTypes.Type.BLOB; + } else if(fieldType.equalsIgnoreCase(serdeConstants.TIMESTAMP_TYPE_NAME)) { + return TajoDataTypes.Type.TIMESTAMP; + } else if(fieldType.equalsIgnoreCase(serdeConstants.DATE_TYPE_NAME)) { + return TajoDataTypes.Type.DATE; + } else { + throw new CatalogException("Cannot find a matched type against from '" + fieldType + "'"); + } + } + + public static String getHiveFieldType(TajoDataTypes.DataType dataType) { + Preconditions.checkNotNull(dataType); + + switch (dataType.getType()) { + case CHAR: return serdeConstants.CHAR_TYPE_NAME; + case BOOLEAN: return serdeConstants.BOOLEAN_TYPE_NAME; + case INT1: return serdeConstants.TINYINT_TYPE_NAME; + case INT2: return serdeConstants.SMALLINT_TYPE_NAME; + case INT4: return serdeConstants.INT_TYPE_NAME; + case INT8: return serdeConstants.BIGINT_TYPE_NAME; + case FLOAT4: return serdeConstants.FLOAT_TYPE_NAME; + case FLOAT8: return serdeConstants.DOUBLE_TYPE_NAME; + case TEXT: return serdeConstants.STRING_TYPE_NAME; + case VARCHAR: return serdeConstants.VARCHAR_TYPE_NAME; + case NCHAR: return serdeConstants.VARCHAR_TYPE_NAME; + case NVARCHAR: return serdeConstants.VARCHAR_TYPE_NAME; + case BINARY: return serdeConstants.BINARY_TYPE_NAME; + case VARBINARY: return serdeConstants.BINARY_TYPE_NAME; + case BLOB: return serdeConstants.BINARY_TYPE_NAME; + case DATE: return serdeConstants.DATE_TYPE_NAME; + case TIMESTAMP: return serdeConstants.TIMESTAMP_TYPE_NAME; + default: + throw new CatalogException(dataType + " is not supported."); + } + } + + public static String getStoreType(String fileFormat) { + Preconditions.checkNotNull(fileFormat); + + String[] fileFormatArrary = fileFormat.split("\\."); + if(fileFormatArrary.length < 1) { + throw new CatalogException("Hive file output format is wrong. - file output format:" + fileFormat); + } + + String outputFormatClass = fileFormatArrary[fileFormatArrary.length-1]; + if(outputFormatClass.equals(HiveIgnoreKeyTextOutputFormat.class.getSimpleName())) { + return CatalogUtil.TEXTFILE_NAME; + } else if(outputFormatClass.equals(HiveSequenceFileOutputFormat.class.getSimpleName())) { + return CatalogProtos.StoreType.SEQUENCEFILE.name(); + } else if(outputFormatClass.equals(RCFileOutputFormat.class.getSimpleName())) { + return CatalogProtos.StoreType.RCFILE.name(); + } else if(outputFormatClass.equals(DeprecatedParquetOutputFormat.class.getSimpleName())) { + return CatalogProtos.StoreType.PARQUET.name(); + } else { + throw new CatalogException("Not supported file output format. - file output format:" + fileFormat); + } + } + + public static Table getTable(IMetaStoreClient client, String dbName, String tableName) throws TException { + return new Table(client.getTable(dbName, tableName)); + } +}