Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CE08B200BBA for ; Sat, 22 Oct 2016 07:33:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CC711160AE9; Sat, 22 Oct 2016 05:33:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 59E42160AE8 for ; Sat, 22 Oct 2016 07:33:53 +0200 (CEST) Received: (qmail 31518 invoked by uid 500); 22 Oct 2016 05:33:47 -0000 Mailing-List: contact commits-help@impala.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@impala.incubator.apache.org Delivered-To: mailing list commits@impala.incubator.apache.org Received: (qmail 31509 invoked by uid 99); 22 Oct 2016 05:33:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Oct 2016 05:33:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id E6F6C1806F8 for ; Sat, 22 Oct 2016 05:33:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id UkBNRtIJ1skK for ; Sat, 22 Oct 2016 05:33:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 05ADC5FC1A for ; Sat, 22 Oct 2016 05:33:29 +0000 (UTC) Received: (qmail 30438 invoked by uid 99); 22 Oct 2016 05:33:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Oct 2016 05:33:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A8331E93E5; Sat, 22 Oct 2016 05:33:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tarmstrong@apache.org To: commits@impala.incubator.apache.org Date: Sat, 22 Oct 2016 05:33:33 -0000 Message-Id: In-Reply-To: <633bbf8b926e46b6a678ef609d86c03d@git.apache.org> References: <633bbf8b926e46b6a678ef609d86c03d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/14] incubator-impala git commit: IMPALA-3719: Simplify CREATE TABLE statements with Kudu tables archived-at: Sat, 22 Oct 2016 05:33:56 -0000 http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/KuduTable.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java index d55f8da..d0185b7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java @@ -19,21 +19,19 @@ package org.apache.impala.catalog; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Set; import javax.xml.bind.DatatypeConverter; -import org.apache.hadoop.hive.metastore.IMetaStoreClient; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; -import org.apache.log4j.Logger; -import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.LocatedTablet; - +import org.apache.impala.analysis.ColumnDef; +import org.apache.impala.analysis.DistributeParam; +import org.apache.impala.analysis.ToSqlUtils; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.thrift.TCatalogObjectType; import org.apache.impala.thrift.TColumn; +import org.apache.impala.thrift.TDistributeByHashParam; +import org.apache.impala.thrift.TDistributeByRangeParam; +import org.apache.impala.thrift.TDistributeParam; import org.apache.impala.thrift.TKuduTable; import org.apache.impala.thrift.TResultSet; import org.apache.impala.thrift.TResultSetMetadata; @@ -42,76 +40,86 @@ import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTableType; import org.apache.impala.util.KuduUtil; import org.apache.impala.util.TResultRowBuilder; +import org.apache.impala.service.CatalogOpExecutor; + import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; + +import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.KuduException; +import org.apache.kudu.client.LocatedTablet; +import org.apache.kudu.client.PartitionSchema.HashBucketSchema; +import org.apache.kudu.client.PartitionSchema.RangeSchema; +import org.apache.kudu.client.PartitionSchema; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; /** - * Impala representation of a Kudu table. - * - * The Kudu-related metadata is stored in the Metastore table's table properties. + * Representation of a Kudu table in the catalog cache. */ public class KuduTable extends Table { - private static final Logger LOG = Logger.getLogger(Table.class); + + private static final Logger LOG = Logger.getLogger(KuduTable.class); // Alias to the string key that identifies the storage handler for Kudu tables. public static final String KEY_STORAGE_HANDLER = hive_metastoreConstants.META_TABLE_STORAGE; - // Key to access the table name from the table properties + // Key to access the table name from the table properties. public static final String KEY_TABLE_NAME = "kudu.table_name"; // Key to access the columns used to build the (composite) key of the table. - // The order of the keys is important. + // Deprecated - Used only for error checking. public static final String KEY_KEY_COLUMNS = "kudu.key_columns"; - // Key to access the master address from the table properties. Error handling for + // Key to access the master host from the table properties. Error handling for // this string is done in the KuduClient library. - // TODO we should have something like KuduConfig.getDefaultConfig() - public static final String KEY_MASTER_ADDRESSES = "kudu.master_addresses"; + // TODO: Rename kudu.master_addresses to kudu.master_host will break compatibility + // with older versions. + public static final String KEY_MASTER_HOSTS = "kudu.master_addresses"; // Kudu specific value for the storage handler table property keyed by // KEY_STORAGE_HANDLER. + // TODO: Fix the storage handler name (see IMPALA-4271). public static final String KUDU_STORAGE_HANDLER = "com.cloudera.kudu.hive.KuduStorageHandler"; // Key to specify the number of tablet replicas. - // TODO(KUDU): Allow modification in alter table. public static final String KEY_TABLET_REPLICAS = "kudu.num_tablet_replicas"; public static final long KUDU_RPC_TIMEOUT_MS = 50000; - // The name of the table in Kudu. + // Table name in the Kudu storage engine. It may not neccessarily be the same as the + // table name specified in the CREATE TABLE statement; the latter + // is stored in Table.name_. Reasons why KuduTable.kuduTableName_ and Table.name_ may + // differ: + // 1. For managed tables, 'kuduTableName_' is prefixed with 'impala::' to + // avoid conficts. TODO: Remove this when Kudu supports databases. + // 2. The user may specify a table name using the 'kudu.table_name' table property. private String kuduTableName_; // Comma separated list of Kudu master hosts with optional ports. private String kuduMasters_; - // The set of columns that are key columns in Kudu. - private ImmutableList kuduKeyColumnNames_; + // Primary key column names. + private final List primaryKeyColumnNames_ = Lists.newArrayList(); + + // Distribution schemes of this Kudu table. Both range and hash-based distributions are + // supported. + private final List distributeBy_ = Lists.newArrayList(); protected KuduTable(TableId id, org.apache.hadoop.hive.metastore.api.Table msTable, Db db, String name, String owner) { super(id, msTable, db, name, owner); - } - - public TKuduTable getKuduTable() { - TKuduTable tbl = new TKuduTable(); - tbl.setKey_columns(Preconditions.checkNotNull(kuduKeyColumnNames_)); - tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(","))); - tbl.setTable_name(kuduTableName_); - return tbl; - } - - @Override - public TTableDescriptor toThriftDescriptor(Set referencedPartitions) { - TTableDescriptor desc = new TTableDescriptor(id_.asInt(), TTableType.KUDU_TABLE, - getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName()); - desc.setKuduTable(getKuduTable()); - return desc; + kuduTableName_ = msTable.getParameters().get(KuduTable.KEY_TABLE_NAME); + kuduMasters_ = msTable.getParameters().get(KuduTable.KEY_MASTER_HOSTS); } @Override @@ -126,78 +134,149 @@ public class KuduTable extends Table { @Override public ArrayList getColumnsInHiveOrder() { return getColumns(); } - public static boolean isKuduTable(org.apache.hadoop.hive.metastore.api.Table mstbl) { - return KUDU_STORAGE_HANDLER.equals(mstbl.getParameters().get(KEY_STORAGE_HANDLER)); + public static boolean isKuduTable(org.apache.hadoop.hive.metastore.api.Table msTbl) { + return KUDU_STORAGE_HANDLER.equals(msTbl.getParameters().get(KEY_STORAGE_HANDLER)); + } + + public String getKuduTableName() { return kuduTableName_; } + public String getKuduMasterHosts() { return kuduMasters_; } + + public List getPrimaryKeyColumnNames() { + return ImmutableList.copyOf(primaryKeyColumnNames_); + } + + public List getDistributeBy() { + return ImmutableList.copyOf(distributeBy_); } /** - * Load the columns from the schema list + * Loads the metadata of a Kudu table. + * + * Schema and distribution schemes are loaded directly from Kudu whereas column stats + * are loaded from HMS. The function also updates the table schema in HMS in order to + * propagate alterations made to the Kudu table to HMS. */ - private void loadColumns(List schema, IMetaStoreClient client, - Set keyColumns) throws TableLoadingException { + @Override + public void load(boolean dummy /* not used */, IMetaStoreClient msClient, + org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { + msTable_ = msTbl; + // This is set to 0 for Kudu tables. + // TODO: Change this to reflect the number of pk columns and modify all the + // places (e.g. insert stmt) that currently make use of this parameter. + numClusteringCols_ = 0; + kuduTableName_ = msTable_.getParameters().get(KuduTable.KEY_TABLE_NAME); + Preconditions.checkNotNull(kuduTableName_); + kuduMasters_ = msTable_.getParameters().get(KuduTable.KEY_MASTER_HOSTS); + Preconditions.checkNotNull(kuduMasters_); + org.apache.kudu.client.KuduTable kuduTable = null; + numRows_ = getRowCount(msTable_.getParameters()); + + // Connect to Kudu to retrieve table metadata + try (KuduClient kuduClient = new KuduClient.KuduClientBuilder( + getKuduMasterHosts()).build()) { + kuduTable = kuduClient.openTable(kuduTableName_); + } catch (KuduException e) { + LOG.error("Error accessing Kudu table " + kuduTableName_); + throw new TableLoadingException(e.getMessage()); + } + Preconditions.checkNotNull(kuduTable); + + // Load metadata from Kudu and HMS + try { + loadSchema(kuduTable); + loadDistributeByParams(kuduTable); + loadAllColumnStats(msClient); + } catch (ImpalaRuntimeException e) { + LOG.error("Error loading metadata for Kudu table: " + kuduTableName_); + throw new TableLoadingException("Error loading metadata for Kudu table " + + kuduTableName_, e); + } - if (keyColumns.size() == 0 || keyColumns.size() > schema.size()) { - throw new TableLoadingException(String.format("Kudu tables must have at least one" - + "key column (had %d), and no more key columns than there are table columns " - + "(had %d).", keyColumns.size(), schema.size())); + // Update the table schema in HMS. + try { + long lastDdlTime = CatalogOpExecutor.calculateDdlTime(msTable_); + msTable_.putToParameters("transient_lastDdlTime", Long.toString(lastDdlTime)); + msTable_.putToParameters(StatsSetupConst.DO_NOT_UPDATE_STATS, + StatsSetupConst.TRUE); + msClient.alter_table(msTable_.getDbName(), msTable_.getTableName(), msTable_); + } catch (TException e) { + throw new TableLoadingException(e.getMessage()); } + } + /** + * Loads the schema from the Kudu table including column definitions and primary key + * columns. Replaces the columns in the HMS table with the columns from the Kudu table. + * Throws an ImpalaRuntimeException if Kudu column data types cannot be mapped to + * Impala data types. + */ + private void loadSchema(org.apache.kudu.client.KuduTable kuduTable) + throws ImpalaRuntimeException { + Preconditions.checkNotNull(kuduTable); clearColumns(); - Set columnNames = Sets.newHashSet(); + primaryKeyColumnNames_.clear(); + List cols = msTable_.getSd().getCols(); + cols.clear(); int pos = 0; - for (FieldSchema field: schema) { - org.apache.impala.catalog.Type type = parseColumnType(field); - // TODO(kudu-merge): Check for decimal types? - boolean isKey = keyColumns.contains(field.getName()); - KuduColumn col = new KuduColumn(field.getName(), isKey, !isKey, type, - field.getComment(), pos); - columnNames.add(col.getName()); - addColumn(col); + for (ColumnSchema colSchema: kuduTable.getSchema().getColumns()) { + Type type = KuduUtil.toImpalaType(colSchema.getType()); + String colName = colSchema.getName(); + cols.add(new FieldSchema(colName, type.toSql().toLowerCase(), null)); + boolean isKey = colSchema.isKey(); + if (isKey) primaryKeyColumnNames_.add(colName); + addColumn(new KuduColumn(colName, isKey, !isKey, type, null, pos)); ++pos; } + } - if (!columnNames.containsAll(keyColumns)) { - throw new TableLoadingException(String.format("Some key columns were not found in" - + " the set of columns. List of column names: %s, List of key column names:" - + " %s", Iterables.toString(columnNames), Iterables.toString(keyColumns))); + private void loadDistributeByParams(org.apache.kudu.client.KuduTable kuduTable) { + Preconditions.checkNotNull(kuduTable); + PartitionSchema partitionSchema = kuduTable.getPartitionSchema(); + Preconditions.checkState(!colsByPos_.isEmpty()); + distributeBy_.clear(); + for (HashBucketSchema hashBucketSchema: partitionSchema.getHashBucketSchemas()) { + List columnNames = Lists.newArrayList(); + for (int colPos: hashBucketSchema.getColumnIds()) { + columnNames.add(colsByPos_.get(colPos).getName()); + } + distributeBy_.add( + DistributeParam.createHashParam(columnNames, hashBucketSchema.getNumBuckets())); } - - kuduKeyColumnNames_ = ImmutableList.copyOf(keyColumns); - - loadAllColumnStats(client); + RangeSchema rangeSchema = partitionSchema.getRangeSchema(); + List columnIds = rangeSchema.getColumns(); + if (columnIds.isEmpty()) return; + List columnNames = Lists.newArrayList(); + for (int colPos: columnIds) columnNames.add(colsByPos_.get(colPos).getName()); + // We don't populate the split values because Kudu's API doesn't currently support + // retrieving the split values for range partitions. + // TODO: File a Kudu JIRA. + distributeBy_.add(DistributeParam.createRangeParam(columnNames, null)); } - @Override - public void load(boolean reuseMetadata, IMetaStoreClient client, - org.apache.hadoop.hive.metastore.api.Table msTbl) throws TableLoadingException { - // TODO handle 'reuseMetadata' - if (getMetaStoreTable() == null || !tableParamsAreValid(msTbl.getParameters())) { - throw new TableLoadingException(String.format( - "Cannot load Kudu table %s, table is corrupt.", name_)); + /** + * Creates a temporary KuduTable object populated with the specified properties but has + * an invalid TableId and is not added to the Kudu storage engine or the + * HMS. This is used for CTAS statements. + */ + public static KuduTable createCtasTarget(Db db, + org.apache.hadoop.hive.metastore.api.Table msTbl, List columnDefs, + List primaryKeyColumnNames, List distributeParams) { + KuduTable tmpTable = new KuduTable(TableId.createInvalidId(), msTbl, db, + msTbl.getTableName(), msTbl.getOwner()); + int pos = 0; + for (ColumnDef colDef: columnDefs) { + tmpTable.addColumn(new Column(colDef.getColName(), colDef.getType(), pos++)); } - - msTable_ = msTbl; - kuduTableName_ = msTbl.getParameters().get(KEY_TABLE_NAME); - kuduMasters_ = msTbl.getParameters().get(KEY_MASTER_ADDRESSES); - - String keyColumnsProp = Preconditions.checkNotNull(msTbl.getParameters() - .get(KEY_KEY_COLUMNS).toLowerCase(), "'kudu.key_columns' cannot be null."); - Set keyColumns = KuduUtil.parseKeyColumns(keyColumnsProp); - - // Load the rest of the data from the table parameters directly - loadColumns(msTbl.getSd().getCols(), client, keyColumns); - - numClusteringCols_ = 0; - - // Get row count from stats - numRows_ = getRowCount(getMetaStoreTable().getParameters()); + tmpTable.primaryKeyColumnNames_.addAll(primaryKeyColumnNames); + tmpTable.distributeBy_.addAll(distributeParams); + return tmpTable; } @Override public TTable toThrift() { TTable table = super.toThrift(); table.setTable_type(TTableType.KUDU_TABLE); - table.setKudu_table(getKuduTable()); + table.setKudu_table(getTKuduTable()); return table; } @@ -207,33 +286,46 @@ public class KuduTable extends Table { TKuduTable tkudu = thriftTable.getKudu_table(); kuduTableName_ = tkudu.getTable_name(); kuduMasters_ = Joiner.on(',').join(tkudu.getMaster_addresses()); - kuduKeyColumnNames_ = ImmutableList.copyOf(tkudu.getKey_columns()); + primaryKeyColumnNames_.clear(); + primaryKeyColumnNames_.addAll(tkudu.getKey_columns()); + loadDistributeByParamsFromThrift(tkudu.getDistribute_by()); } - public String getKuduTableName() { return kuduTableName_; } - public String getKuduMasterAddresses() { return kuduMasters_; } - public int getNumKeyColumns() { return kuduKeyColumnNames_.size(); } - - /** - * Returns true if all required parameters are present in the given table properties - * map. - * TODO(kudu-merge) Return a more specific error string. - */ - public static boolean tableParamsAreValid(Map params) { - return params.get(KEY_TABLE_NAME) != null && params.get(KEY_TABLE_NAME).length() > 0 - && params.get(KEY_MASTER_ADDRESSES) != null - && params.get(KEY_MASTER_ADDRESSES).length() > 0 - && params.get(KEY_KEY_COLUMNS) != null - && params.get(KEY_KEY_COLUMNS).length() > 0; - } + private void loadDistributeByParamsFromThrift(List params) { + distributeBy_.clear(); + for (TDistributeParam param: params) { + if (param.isSetBy_hash_param()) { + TDistributeByHashParam hashParam = param.getBy_hash_param(); + distributeBy_.add(DistributeParam.createHashParam( + hashParam.getColumns(), hashParam.getNum_buckets())); + } else { + Preconditions.checkState(param.isSetBy_range_param()); + TDistributeByRangeParam rangeParam = param.getBy_range_param(); + distributeBy_.add(DistributeParam.createRangeParam(rangeParam.getColumns(), + null)); + } + } + } - /** - * The number of nodes is not know ahead of time and will be updated during computeStats - * in the scan node. - */ - public int getNumNodes() { return -1; } + @Override + public TTableDescriptor toThriftDescriptor(Set referencedPartitions) { + TTableDescriptor desc = new TTableDescriptor(id_.asInt(), TTableType.KUDU_TABLE, + getTColumnDescriptors(), numClusteringCols_, kuduTableName_, db_.getName()); + desc.setKuduTable(getTKuduTable()); + return desc; + } - public List getKuduKeyColumnNames() { return kuduKeyColumnNames_; } + private TKuduTable getTKuduTable() { + TKuduTable tbl = new TKuduTable(); + tbl.setKey_columns(Preconditions.checkNotNull(primaryKeyColumnNames_)); + tbl.setMaster_addresses(Lists.newArrayList(kuduMasters_.split(","))); + tbl.setTable_name(kuduTableName_); + Preconditions.checkNotNull(distributeBy_); + for (DistributeParam distributeParam: distributeBy_) { + tbl.addToDistribute_by(distributeParam.toThrift()); + } + return tbl; + } public TResultSet getTableStats() throws ImpalaRuntimeException { TResultSet result = new TResultSet(); @@ -247,7 +339,7 @@ public class KuduTable extends Table { resultSchema.addToColumns(new TColumn("# Replicas", Type.INT.toThrift())); try (KuduClient client = new KuduClient.KuduClientBuilder( - getKuduMasterAddresses()).build()) { + getKuduMasterHosts()).build()) { org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName_); List tablets = kuduTable.getTabletsLocations(KUDU_RPC_TIMEOUT_MS); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Table.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 6145cc5..4b40b44 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -455,6 +455,11 @@ public abstract class Table implements CatalogObject { @Override public boolean isLoaded() { return true; } + public static boolean isExternalTable( + org.apache.hadoop.hive.metastore.api.Table msTbl) { + return msTbl.getTableType().equalsIgnoreCase(TableType.EXTERNAL_TABLE.toString()); + } + /** * If the table is cached, it returns a pair * and adds the table cached directive ID to 'cacheDirIds'. Otherwise, it http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/TableLoader.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java index 764abe0..8541a3a 100644 --- a/fe/src/main/java/org/apache/impala/catalog/TableLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/TableLoader.java @@ -18,7 +18,6 @@ package org.apache.impala.catalog; import java.util.EnumSet; -import java.util.Set; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/Type.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/Type.java b/fe/src/main/java/org/apache/impala/catalog/Type.java index 91fc2e3..05c71c7 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Type.java +++ b/fe/src/main/java/org/apache/impala/catalog/Type.java @@ -317,6 +317,15 @@ public abstract class Type { } /** + * Checks if types t1 and t2 are assignment compatible, i.e. if both t1 and t2 can be + * assigned to a type t without an explicit cast and without any conversions that would + * result in loss of precision. + */ + public static boolean areAssignmentCompatibleTypes(Type t1, Type t2) { + return getAssignmentCompatibleType(t1, t2, true) != ScalarType.INVALID; + } + + /** * Returns true if this type exceeds the MAX_NESTING_DEPTH, false otherwise. */ public boolean exceedsMaxNestingDepth() { return exceedsMaxNestingDepth(0); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java b/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java deleted file mode 100644 index 6c3ba8e..0000000 --- a/fe/src/main/java/org/apache/impala/catalog/delegates/DdlDelegate.java +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.impala.catalog.delegates; - -import java.util.List; - -import org.apache.impala.thrift.TDistributeParam; -import org.apache.hadoop.hive.metastore.api.Table; - -import org.apache.impala.common.ImpalaRuntimeException; -import org.apache.impala.thrift.TAlterTableParams; - -/** - * Abstract class to implement the storage specific portion of DDL requests. - * - * During catalog DDL operations the CatalogOpExecutor will instantiate the correct - * subclass of this class to handle the DDL operation to the storage backend. See, - * CatalogOpExecutor::createDDLDelegate() for details. - * - */ -public abstract class DdlDelegate { - - protected Table msTbl_; - protected TAlterTableParams tAlterTableParams_; - protected List distributeParams_; - - /** - * Creates a new delegate to modify Table 'msTbl'. - */ - public DdlDelegate setMsTbl(Table msTbl) { - msTbl_ = msTbl; - return this; - } - - public DdlDelegate setAlterTableParams(TAlterTableParams p) { - tAlterTableParams_ = p; - return this; - } - - public DdlDelegate setDistributeParams(List p) { - distributeParams_ = p; - return this; - } - - /** - * Creates the table. - */ - public abstract void createTable() throws ImpalaRuntimeException; - - /** - * Drops the table. - */ - public abstract void dropTable() throws ImpalaRuntimeException; - - /** - * Performs an alter table with the parameters set with setAlterTableParams(). - */ - public abstract boolean alterTable() throws ImpalaRuntimeException; - -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java b/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java deleted file mode 100644 index 8410868..0000000 --- a/fe/src/main/java/org/apache/impala/catalog/delegates/KuduDdlDelegate.java +++ /dev/null @@ -1,190 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.impala.catalog.delegates; - -import static org.apache.impala.util.KuduUtil.compareSchema; -import static org.apache.impala.util.KuduUtil.fromImpalaType; -import static org.apache.impala.util.KuduUtil.parseKeyColumns; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; - -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.kudu.ColumnSchema; -import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; -import org.apache.kudu.Schema; -import org.apache.kudu.Type; -import org.apache.kudu.client.CreateTableOptions; -import org.apache.kudu.client.KuduClient; -import org.apache.kudu.client.PartialRow; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.impala.catalog.KuduTable; -import org.apache.impala.common.ImpalaRuntimeException; -import org.apache.impala.thrift.TDistributeParam; -import org.apache.impala.util.KuduUtil; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; - - -/** - * Implementation of the Kudu DDL Delegate. Propagates create and drop table statements to - * Kudu. - */ -public class KuduDdlDelegate extends DdlDelegate { - - private static final Logger LOG = LoggerFactory.getLogger(KuduDdlDelegate.class); - - public KuduDdlDelegate(Table msTbl) { - setMsTbl(msTbl); - } - - /** - * Creates the Kudu table if it does not exist and returns true. If the table exists and - * the table is not a managed table ignore and return false, otherwise throw an - * exception. - */ - @Override - public void createTable() - throws ImpalaRuntimeException { - - String kuduTableName = msTbl_.getParameters().get(KuduTable.KEY_TABLE_NAME); - String kuduMasters = msTbl_.getParameters().get(KuduTable.KEY_MASTER_ADDRESSES); - - // Can be optional for un-managed tables - String kuduKeyCols = msTbl_.getParameters().get(KuduTable.KEY_KEY_COLUMNS); - - String replication = msTbl_.getParameters().get(KuduTable.KEY_TABLET_REPLICAS); - - try (KuduClient client = new KuduClient.KuduClientBuilder(kuduMasters).build()) { - // TODO should we throw if the table does not exist when its an external table? - if (client.tableExists(kuduTableName)) { - if (msTbl_.getTableType().equals(TableType.MANAGED_TABLE.toString())) { - throw new ImpalaRuntimeException(String.format( - "Table %s already exists in Kudu master %s.", kuduTableName, kuduMasters)); - } - - // Check if the external table matches the schema - org.apache.kudu.client.KuduTable kuduTable = client.openTable(kuduTableName); - if (!compareSchema(msTbl_, kuduTable)) { - throw new ImpalaRuntimeException(String.format( - "Table %s (%s) has a different schema in Kudu than in Hive.", - msTbl_.getTableName(), kuduTableName)); - } - return; - } - - HashSet keyColNames = parseKeyColumns(kuduKeyCols); - List keyColSchemas = new ArrayList<>(); - - // Create a new Schema and map the types accordingly - ArrayList columns = Lists.newArrayList(); - for (FieldSchema fieldSchema: msTbl_.getSd().getCols()) { - org.apache.impala.catalog.Type catalogType = org.apache.impala.catalog.Type - .parseColumnType(fieldSchema.getType()); - if (catalogType == null) { - throw new ImpalaRuntimeException(String.format( - "Could not parse column type %s.", fieldSchema.getType())); - } - Type t = fromImpalaType(catalogType); - // Create the actual column and check if the column is a key column - ColumnSchemaBuilder csb = new ColumnSchemaBuilder( - fieldSchema.getName(), t); - boolean isKeyColumn = keyColNames.contains(fieldSchema.getName()); - csb.key(isKeyColumn); - csb.nullable(!isKeyColumn); - ColumnSchema cs = csb.build(); - columns.add(cs); - if (isKeyColumn) keyColSchemas.add(cs); - } - - Schema schema = new Schema(columns); - CreateTableOptions cto = new CreateTableOptions(); - - // Handle auto-partitioning of the Kudu table - if (distributeParams_ != null) { - for (TDistributeParam param : distributeParams_) { - if (param.isSetBy_hash_param()) { - Preconditions.checkState(!param.isSetBy_range_param()); - cto.addHashPartitions(param.getBy_hash_param().getColumns(), - param.getBy_hash_param().getNum_buckets()); - } else { - Preconditions.checkState(param.isSetBy_range_param()); - cto.setRangePartitionColumns(param.getBy_range_param().getColumns()); - for (PartialRow p : KuduUtil.parseSplits(schema, param.getBy_range_param())) { - cto.addSplitRow(p); - } - } - } - } - - if (!Strings.isNullOrEmpty(replication)) { - int r = Integer.parseInt(replication); - if (r <= 0) { - throw new ImpalaRuntimeException( - "Number of tablet replicas must be greater than zero. " + - "Given number of replicas is: " + Integer.toString(r)); - } - cto.setNumReplicas(r); - } - - client.createTable(kuduTableName, schema, cto); - } catch (ImpalaRuntimeException e) { - throw e; - } catch (Exception e) { - throw new ImpalaRuntimeException("Error creating Kudu table", e); - } - } - - @Override - public void dropTable() throws ImpalaRuntimeException { - // If table is an external table, do not delete the data - if (msTbl_.getTableType().equals(TableType.EXTERNAL_TABLE.toString())) return; - - String kuduTableName = msTbl_.getParameters().get(KuduTable.KEY_TABLE_NAME); - String kuduMasters = msTbl_.getParameters().get(KuduTable.KEY_MASTER_ADDRESSES); - - try (KuduClient client = new KuduClient.KuduClientBuilder(kuduMasters).build()) { - if (!client.tableExists(kuduTableName)) { - LOG.warn("Table: %s is in inconsistent state. It does not exist in Kudu master(s)" - + " %s, but it exists in Hive metastore. Deleting from metastore only.", - kuduTableName, kuduMasters); - return; - } - client.deleteTable(kuduTableName); - return; - } catch (Exception e) { - throw new ImpalaRuntimeException("Error dropping Kudu table", e); - } - } - - public static boolean canHandle(org.apache.hadoop.hive.metastore.api.Table msTbl) { - return KuduTable.isKuduTable(msTbl); - } - - @Override - public boolean alterTable() throws ImpalaRuntimeException { - throw new ImpalaRuntimeException( - "Alter table operations are not supported for Kudu tables."); - } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java b/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java deleted file mode 100644 index 8aabaa4..0000000 --- a/fe/src/main/java/org/apache/impala/catalog/delegates/UnsupportedOpDelegate.java +++ /dev/null @@ -1,35 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.impala.catalog.delegates; - -import org.apache.impala.common.ImpalaRuntimeException; - -/** - * Empty implementation for the DdlDelegate interface that does nothing. - */ -public class UnsupportedOpDelegate extends DdlDelegate { - - @Override - public void createTable() throws ImpalaRuntimeException { } - - @Override - public void dropTable() throws ImpalaRuntimeException { } - - @Override - public boolean alterTable() throws ImpalaRuntimeException { return true; } -} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java index 3345c1b..8d15425 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsPartitionFilter.java @@ -52,7 +52,7 @@ public class HdfsPartitionFilter { // lhs exprs of smap used in isMatch() private final ArrayList lhsSlotRefs_ = Lists.newArrayList(); - // indices into Table.getColumns() + // indices into Table.getColumnNames() private final ArrayList refdKeys_ = Lists.newArrayList(); public HdfsPartitionFilter(Expr predicate, HdfsTable tbl, Analyzer analyzer) { http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java index 64ef822..9434801 100644 --- a/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/KuduScanNode.java @@ -107,7 +107,7 @@ public class KuduScanNode extends ScanNode { conjuncts_ = orderConjunctsByCost(conjuncts_); try (KuduClient client = - new KuduClientBuilder(kuduTable_.getKuduMasterAddresses()).build()) { + new KuduClientBuilder(kuduTable_.getKuduMasterHosts()).build()) { org.apache.kudu.client.KuduTable rpcTable = client.openTable(kuduTable_.getKuduTableName()); validateSchema(rpcTable); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java index 5743a59..54493d1 100644 --- a/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java +++ b/fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java @@ -51,6 +51,11 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.log4j.Logger; import org.apache.thrift.TException; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import org.apache.impala.analysis.FunctionName; import org.apache.impala.analysis.TableName; @@ -61,7 +66,6 @@ import org.apache.impala.catalog.CatalogServiceCatalog; import org.apache.impala.catalog.Column; import org.apache.impala.catalog.ColumnNotFoundException; import org.apache.impala.catalog.DataSource; -import org.apache.impala.catalog.DatabaseNotFoundException; import org.apache.impala.catalog.Db; import org.apache.impala.catalog.Function; import org.apache.impala.catalog.HBaseTable; @@ -70,6 +74,7 @@ import org.apache.impala.catalog.HdfsPartition; import org.apache.impala.catalog.HdfsTable; import org.apache.impala.catalog.HiveStorageDescriptorFactory; import org.apache.impala.catalog.IncompleteTable; +import org.apache.impala.catalog.KuduTable; import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient; import org.apache.impala.catalog.PartitionNotFoundException; import org.apache.impala.catalog.PartitionStatsUtil; @@ -82,9 +87,6 @@ import org.apache.impala.catalog.TableLoadingException; import org.apache.impala.catalog.TableNotFoundException; import org.apache.impala.catalog.Type; import org.apache.impala.catalog.View; -import org.apache.impala.catalog.delegates.DdlDelegate; -import org.apache.impala.catalog.delegates.KuduDdlDelegate; -import org.apache.impala.catalog.delegates.UnsupportedOpDelegate; import org.apache.impala.common.FileSystemUtil; import org.apache.impala.common.ImpalaException; import org.apache.impala.common.ImpalaRuntimeException; @@ -121,7 +123,6 @@ import org.apache.impala.thrift.TCreateTableParams; import org.apache.impala.thrift.TDatabase; import org.apache.impala.thrift.TDdlExecRequest; import org.apache.impala.thrift.TDdlExecResponse; -import org.apache.impala.thrift.TDistributeParam; import org.apache.impala.thrift.TDropDataSourceParams; import org.apache.impala.thrift.TDropDbParams; import org.apache.impala.thrift.TDropFunctionParams; @@ -149,11 +150,6 @@ import org.apache.impala.thrift.TTruncateParams; import org.apache.impala.thrift.TUpdateCatalogRequest; import org.apache.impala.thrift.TUpdateCatalogResponse; import org.apache.impala.util.HdfsCachingUtil; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; /** * Class used to execute Catalog Operations, including DDL and refresh/invalidate @@ -1103,8 +1099,7 @@ public class CatalogOpExecutor { /** * Drops a database from the metastore and removes the database's metadata from the - * internal cache. Re-throws any Hive Meta Store exceptions encountered during - * the drop. + * internal cache. Re-throws any HMS exceptions encountered during the drop. */ private void dropDatabase(TDropDbParams params, TDdlExecResponse resp) throws ImpalaException { @@ -1120,6 +1115,9 @@ public class CatalogOpExecutor { TCatalogObject removedObject = new TCatalogObject(); synchronized (metastoreDdlLock_) { + // Remove all the Kudu tables of 'db' from the Kudu storage engine. + if (db != null && params.cascade) dropTablesFromKudu(db); + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { msClient.getHiveClient().dropDatabase( params.getDb(), true, params.if_exists, params.cascade); @@ -1144,6 +1142,44 @@ public class CatalogOpExecutor { } /** + * Drops all the Kudu tables of database 'db' from the Kudu storage engine. Retrieves + * the Kudu table name of each table in 'db' from HMS. Throws an ImpalaException if + * metadata for Kudu tables cannot be loaded from HMS or if an error occurs while + * trying to drop a table from Kudu. + */ + private void dropTablesFromKudu(Db db) throws ImpalaException { + // If the table format isn't available, because the table hasn't been loaded yet, + // the metadata must be fetched from the Hive Metastore. + List incompleteTableNames = Lists.newArrayList(); + List msTables = Lists.newArrayList(); + for (Table table: db.getTables()) { + org.apache.hadoop.hive.metastore.api.Table msTable = table.getMetaStoreTable(); + if (msTable == null) { + incompleteTableNames.add(table.getName()); + } else { + msTables.add(msTable); + } + } + if (!incompleteTableNames.isEmpty()) { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + msTables.addAll(msClient.getHiveClient().getTableObjectsByName( + db.getName(), incompleteTableNames)); + } catch (TException e) { + LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTableObjectsByName") + + e.getMessage()); + } + } + for (org.apache.hadoop.hive.metastore.api.Table msTable: msTables) { + if (!KuduTable.isKuduTable(msTable) || Table.isExternalTable(msTable)) continue; + // The operation will be aborted if the Kudu table cannot be dropped. If for + // some reason Kudu is permanently stuck in a non-functional state, the user is + // expected to ALTER TABLE to either set the table to UNMANAGED or set the format + // to something else. + KuduCatalogOpExecutor.dropTable(msTable, /*if exists*/ true); + } + } + + /** * Drops a table or view from the metastore and removes it from the catalog. * Also drops all associated caching requests on the table and/or table's partitions, * uncaching all table data. If params.purge is true, table data is permanently @@ -1157,17 +1193,6 @@ public class CatalogOpExecutor { TCatalogObject removedObject = new TCatalogObject(); synchronized (metastoreDdlLock_) { - - // Forward the DDL operation to the specified storage backend. - try { - org.apache.hadoop.hive.metastore.api.Table msTbl = getExistingTable( - tableName.getDb(), tableName.getTbl()).getMetaStoreTable(); - DdlDelegate handler = createDdlDelegate(msTbl); - handler.dropTable(); - } catch (TableNotFoundException | DatabaseNotFoundException e) { - // Do nothing - } - Db db = catalog_.getDb(params.getTable_name().db_name); if (db == null) { if (params.if_exists) return; @@ -1179,6 +1204,23 @@ public class CatalogOpExecutor { if (params.if_exists) return; throw new CatalogException("Table/View does not exist: " + tableName); } + + // Retrieve the HMS table to determine if this is a Kudu table. + org.apache.hadoop.hive.metastore.api.Table msTbl = existingTbl.getMetaStoreTable(); + if (msTbl == null) { + Preconditions.checkState(existingTbl instanceof IncompleteTable); + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + msTbl = msClient.getHiveClient().getTable(tableName.getDb(), + tableName.getTbl()); + } catch (TException e) { + LOG.error(String.format(HMS_RPC_ERROR_FORMAT_STR, "getTable") + e.getMessage()); + } + } + if (msTbl != null && KuduTable.isKuduTable(msTbl) + && !Table.isExternalTable(msTbl)) { + KuduCatalogOpExecutor.dropTable(msTbl, /* if exists */ true); + } + // Check to make sure we don't drop a view with "drop table" statement and // vice versa. is_table field is marked optional in TDropTableOrViewParams to // maintain catalog api compatibility. @@ -1343,7 +1385,8 @@ public class CatalogOpExecutor { /** * Creates a new table in the metastore and adds an entry to the metadata cache to - * lazily load the new metadata on the next access. Re-throws any Hive Meta Store + * lazily load the new metadata on the next access. If this is a managed Kudu table, + * the table is also created in the Kudu storage engine. Re-throws any HMS or Kudu * exceptions encountered during the create. */ private boolean createTable(TCreateTableParams params, TDdlExecResponse response) @@ -1351,9 +1394,8 @@ public class CatalogOpExecutor { Preconditions.checkNotNull(params); TableName tableName = TableName.fromThrift(params.getTable_name()); Preconditions.checkState(tableName != null && tableName.isFullyQualified()); - Preconditions.checkState(params.getColumns() != null && - params.getColumns().size() > 0, - "Null or empty column list given as argument to Catalog.createTable"); + Preconditions.checkState(params.getColumns() != null, + "Null column list given as argument to Catalog.createTable"); if (params.if_not_exists && catalog_.containsTable(tableName.getDb(), tableName.getTbl())) { @@ -1362,11 +1404,161 @@ public class CatalogOpExecutor { response.getResult().setVersion(catalog_.getCatalogVersion()); return false; } - org.apache.hadoop.hive.metastore.api.Table tbl = - createMetaStoreTable(params); + org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params); LOG.debug(String.format("Creating table %s", tableName)); - return createTable(tbl, params.if_not_exists, params.getCache_op(), - params.getDistribute_by(), response); + if (KuduTable.isKuduTable(tbl)) return createKuduTable(tbl, params, response); + Preconditions.checkState(params.getColumns().size() > 0, + "Empty column list given as argument to Catalog.createTable"); + return createTable(tbl, params.if_not_exists, params.getCache_op(), response); + } + + /** + * Utility function that creates a hive.metastore.api.Table object based on the given + * TCreateTableParams. + * TODO: Extract metastore object creation utility functions into a separate + * helper/factory class. + */ + public static org.apache.hadoop.hive.metastore.api.Table createMetaStoreTable( + TCreateTableParams params) { + Preconditions.checkNotNull(params); + TableName tableName = TableName.fromThrift(params.getTable_name()); + org.apache.hadoop.hive.metastore.api.Table tbl = + new org.apache.hadoop.hive.metastore.api.Table(); + tbl.setDbName(tableName.getDb()); + tbl.setTableName(tableName.getTbl()); + tbl.setOwner(params.getOwner()); + if (params.isSetTable_properties()) { + tbl.setParameters(params.getTable_properties()); + } else { + tbl.setParameters(new HashMap()); + } + + if (params.getComment() != null) { + tbl.getParameters().put("comment", params.getComment()); + } + if (params.is_external) { + tbl.setTableType(TableType.EXTERNAL_TABLE.toString()); + tbl.putToParameters("EXTERNAL", "TRUE"); + } else { + tbl.setTableType(TableType.MANAGED_TABLE.toString()); + } + + tbl.setSd(createSd(params)); + if (params.getPartition_columns() != null) { + // Add in any partition keys that were specified + tbl.setPartitionKeys(buildFieldSchemaList(params.getPartition_columns())); + } else { + tbl.setPartitionKeys(new ArrayList()); + } + return tbl; + } + + private static StorageDescriptor createSd(TCreateTableParams params) { + StorageDescriptor sd = HiveStorageDescriptorFactory.createSd( + params.getFile_format(), RowFormat.fromThrift(params.getRow_format())); + if (params.isSetSerde_properties()) { + if (sd.getSerdeInfo().getParameters() == null) { + sd.getSerdeInfo().setParameters(params.getSerde_properties()); + } else { + sd.getSerdeInfo().getParameters().putAll(params.getSerde_properties()); + } + } + + if (params.getLocation() != null) sd.setLocation(params.getLocation()); + + // Add in all the columns + sd.setCols(buildFieldSchemaList(params.getColumns())); + return sd; + } + + /** + * Creates a new Kudu table. The Kudu table is first created in the Kudu storage engine + * (only applicable to managed tables), then in HMS and finally in the catalog cache. + * Failure to add the table in HMS results in the table being dropped from Kudu. + * 'response' is populated with the results of this operation. Returns true if a new + * table was created as part of this call, false otherwise. + */ + private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table newTable, + TCreateTableParams params, TDdlExecResponse response) throws ImpalaException { + Preconditions.checkState(KuduTable.isKuduTable(newTable)); + if (Table.isExternalTable(newTable)) { + KuduCatalogOpExecutor.populateColumnsFromKudu(newTable); + } else { + KuduCatalogOpExecutor.createManagedTable(newTable, params); + } + try { + // Add the table to the HMS and the catalog cache. Aquire metastoreDdlLock_ to + // ensure the atomicity of these operations. + synchronized (metastoreDdlLock_) { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + msClient.getHiveClient().createTable(newTable); + } + // Add the table to the catalog cache + Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName()); + addTableToCatalogUpdate(newTbl, response.result); + } + } catch (Exception e) { + try { + // Error creating the table in HMS, drop the managed table from Kudu. + if (!Table.isExternalTable(newTable)) { + KuduCatalogOpExecutor.dropTable(newTable, false); + } + } catch (Exception logged) { + String kuduTableName = newTable.getParameters().get(KuduTable.KEY_TABLE_NAME); + LOG.error(String.format("Failed to drop Kudu table '%s'", kuduTableName), + logged); + throw new RuntimeException(String.format("Failed to create the table '%s' in " + + " the Metastore and the newly created Kudu table '%s' could not be " + + " dropped. The log contains more information.", newTable.getTableName(), + kuduTableName), e); + } + if (e instanceof AlreadyExistsException && params.if_not_exists) return false; + throw new ImpalaRuntimeException( + String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e); + } + return true; + } + + /** + * Creates a new table. The table is initially created in HMS and, if that operation + * succeeds, it is then added in the catalog cache. It also sets HDFS caching if + * 'cacheOp' is not null. 'response' is populated with the results of this operation. + * Returns true if a new table was created as part of this call, false otherwise. + */ + private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable, + boolean if_not_exists, THdfsCachingOp cacheOp, TDdlExecResponse response) + throws ImpalaException { + Preconditions.checkState(!KuduTable.isKuduTable(newTable)); + synchronized (metastoreDdlLock_) { + try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { + msClient.getHiveClient().createTable(newTable); + // If this table should be cached, and the table location was not specified by + // the user, an extra step is needed to read the table to find the location. + if (cacheOp != null && cacheOp.isSet_cached() && + newTable.getSd().getLocation() == null) { + newTable = msClient.getHiveClient().getTable( + newTable.getDbName(), newTable.getTableName()); + } + } catch (Exception e) { + if (e instanceof AlreadyExistsException && if_not_exists) return false; + throw new ImpalaRuntimeException( + String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e); + } + + // Submit the cache request and update the table metadata. + if (cacheOp != null && cacheOp.isSet_cached()) { + short replication = cacheOp.isSetReplication() ? cacheOp.getReplication() : + JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR; + long id = HdfsCachingUtil.submitCacheTblDirective(newTable, + cacheOp.getCache_pool_name(), replication); + catalog_.watchCacheDirs(Lists.newArrayList(id), + new TTableName(newTable.getDbName(), newTable.getTableName())); + applyAlterTable(newTable); + } + Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName()); + addTableToCatalogUpdate(newTbl, response.result); + } + return true; } /** @@ -1392,7 +1584,7 @@ public class CatalogOpExecutor { new org.apache.hadoop.hive.metastore.api.Table(); setViewAttributes(params, view); LOG.debug(String.format("Creating view %s", tableName)); - createTable(view, params.if_not_exists, null, null, response); + createTable(view, params.if_not_exists, null, response); } /** @@ -1423,6 +1615,8 @@ public class CatalogOpExecutor { Table srcTable = getExistingTable(srcTblName.getDb(), srcTblName.getTbl()); org.apache.hadoop.hive.metastore.api.Table tbl = srcTable.getMetaStoreTable().deepCopy(); + Preconditions.checkState(!KuduTable.isKuduTable(tbl), + "CREATE TABLE LIKE is not supported for Kudu tables."); tbl.setDbName(tblName.getDb()); tbl.setTableName(tblName.getTbl()); tbl.setOwner(params.getOwner()); @@ -1460,7 +1654,7 @@ public class CatalogOpExecutor { tbl.getSd().setLocation(params.getLocation()); if (fileFormat != null) { setStorageDescriptorFileFormat(tbl.getSd(), fileFormat); - } else if (fileFormat == null && srcTable instanceof View) { + } else if (srcTable instanceof View) { // Here, source table is a view which has no input format. So to be // consistent with CREATE TABLE, default input format is assumed to be // TEXT unless otherwise specified. @@ -1469,85 +1663,7 @@ public class CatalogOpExecutor { // Set the row count of this table to unknown. tbl.putToParameters(StatsSetupConst.ROW_COUNT, "-1"); LOG.debug(String.format("Creating table %s LIKE %s", tblName, srcTblName)); - createTable(tbl, params.if_not_exists, null, null, response); - } - - /** - * Creates a new table in the HMS. If ifNotExists=true, no error will be thrown if - * the table already exists, otherwise an exception will be thrown. - * Accepts an optional 'cacheOp' param, which if specified will cache the table's - * HDFS location according to the 'cacheOp' spec after creation. - * Stores details of the operations (such as the resulting catalog version) in - * 'response' output parameter. - * Returns true if a new table was created as part of this call, false otherwise. - */ - private boolean createTable(org.apache.hadoop.hive.metastore.api.Table newTable, - boolean ifNotExists, THdfsCachingOp cacheOp, List distribute_by, - TDdlExecResponse response) - throws ImpalaException { - synchronized (metastoreDdlLock_) { - - try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { - msClient.getHiveClient().createTable(newTable); - // If this table should be cached, and the table location was not specified by - // the user, an extra step is needed to read the table to find the location. - if (cacheOp != null && cacheOp.isSet_cached() && - newTable.getSd().getLocation() == null) { - newTable = msClient.getHiveClient().getTable(newTable.getDbName(), - newTable.getTableName()); - } - } catch (AlreadyExistsException e) { - if (!ifNotExists) { - throw new ImpalaRuntimeException( - String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e); - } - LOG.debug(String.format("Ignoring '%s' when creating table %s.%s because " + - "IF NOT EXISTS was specified.", e, - newTable.getDbName(), newTable.getTableName())); - return false; - } catch (TException e) { - throw new ImpalaRuntimeException( - String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e); - } - - // Forward the operation to a specific storage backend. If the operation fails, - // delete the just created hive table to avoid inconsistencies. - try { - createDdlDelegate(newTable).setDistributeParams(distribute_by).createTable(); - } catch (ImpalaRuntimeException e) { - try (MetaStoreClient c = catalog_.getMetaStoreClient()) { - c.getHiveClient().dropTable(newTable.getDbName(), newTable.getTableName(), - false, ifNotExists); - } catch (Exception hE) { - throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR, - "dropTable"), hE); - } - throw e; - } - - // Submit the cache request and update the table metadata. - if (cacheOp != null && cacheOp.isSet_cached()) { - short replication = cacheOp.isSetReplication() ? cacheOp.getReplication() : - JniCatalogConstants.HDFS_DEFAULT_CACHE_REPLICATION_FACTOR; - long id = HdfsCachingUtil.submitCacheTblDirective(newTable, - cacheOp.getCache_pool_name(), replication); - catalog_.watchCacheDirs(Lists.newArrayList(id), - new TTableName(newTable.getDbName(), newTable.getTableName())); - applyAlterTable(newTable); - } - Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName()); - addTableToCatalogUpdate(newTbl, response.result); - } - return true; - } - - /** - * Instantiate the appropriate DDL delegate for the table. If no known delegate is - * available for the table, returns a UnsupportedOpDelegate instance. - */ - private DdlDelegate createDdlDelegate(org.apache.hadoop.hive.metastore.api.Table tab) { - if (KuduDdlDelegate.canHandle(tab)) return new KuduDdlDelegate(tab); - return new UnsupportedOpDelegate(); + createTable(tbl, params.if_not_exists, null, response); } /** @@ -1967,6 +2083,9 @@ public class CatalogOpExecutor { switch (params.getTarget()) { case TBL_PROPERTY: msTbl.getParameters().putAll(properties); + if (KuduTable.isKuduTable(msTbl)) { + KuduCatalogOpExecutor.validateKuduTblExists(msTbl); + } break; case SERDE_PROPERTY: msTbl.getSd().getSerdeInfo().getParameters().putAll(properties); @@ -2120,7 +2239,6 @@ public class CatalogOpExecutor { Preconditions.checkNotNull(cacheOp); Preconditions.checkNotNull(params.getPartition_spec()); // Alter partition params. - final String RUNTIME_FILTER_FORMAT = "apply %s on %s"; TableName tableName = tbl.getTableName(); HdfsPartition partition = catalog_.getHdfsPartition( tableName.getDb(), tableName.getTbl(), params.getPartition_spec()); @@ -2535,16 +2653,6 @@ public class CatalogOpExecutor { } /** - * Returns a deep copy of the metastore.api.Table object for the given TableName. - */ - private org.apache.hadoop.hive.metastore.api.Table getMetaStoreTable( - TableName tableName) throws CatalogException { - Preconditions.checkState(tableName != null && tableName.isFullyQualified()); - return getExistingTable(tableName.getDb(), tableName.getTbl()) - .getMetaStoreTable().deepCopy(); - } - - /** * Returns the metastore.api.Table object from the Hive Metastore for an existing * fully loaded table. */ @@ -2608,7 +2716,7 @@ public class CatalogOpExecutor { /** * Calculates the next transient_lastDdlTime value. */ - private static long calculateDdlTime( + public static long calculateDdlTime( org.apache.hadoop.hive.metastore.api.Table msTbl) { long existingLastDdlTime = CatalogServiceCatalog.getLastDdlTime(msTbl); long currentTime = System.currentTimeMillis() / 1000; @@ -2617,63 +2725,6 @@ public class CatalogOpExecutor { } /** - * Utility function that creates a hive.metastore.api.Table object based on the given - * TCreateTableParams. - * TODO: Extract metastore object creation utility functions into a separate - * helper/factory class. - */ - public static org.apache.hadoop.hive.metastore.api.Table - createMetaStoreTable(TCreateTableParams params) { - Preconditions.checkNotNull(params); - TableName tableName = TableName.fromThrift(params.getTable_name()); - org.apache.hadoop.hive.metastore.api.Table tbl = - new org.apache.hadoop.hive.metastore.api.Table(); - tbl.setDbName(tableName.getDb()); - tbl.setTableName(tableName.getTbl()); - tbl.setOwner(params.getOwner()); - if (params.isSetTable_properties()) { - tbl.setParameters(params.getTable_properties()); - } else { - tbl.setParameters(new HashMap()); - } - - if (params.getComment() != null) { - tbl.getParameters().put("comment", params.getComment()); - } - if (params.is_external) { - tbl.setTableType(TableType.EXTERNAL_TABLE.toString()); - tbl.putToParameters("EXTERNAL", "TRUE"); - } else { - tbl.setTableType(TableType.MANAGED_TABLE.toString()); - } - - StorageDescriptor sd = HiveStorageDescriptorFactory.createSd( - params.getFile_format(), RowFormat.fromThrift(params.getRow_format())); - - if (params.isSetSerde_properties()) { - if (sd.getSerdeInfo().getParameters() == null) { - sd.getSerdeInfo().setParameters(params.getSerde_properties()); - } else { - sd.getSerdeInfo().getParameters().putAll(params.getSerde_properties()); - } - } - - if (params.getLocation() != null) { - sd.setLocation(params.getLocation()); - } - // Add in all the columns - sd.setCols(buildFieldSchemaList(params.getColumns())); - tbl.setSd(sd); - if (params.getPartition_columns() != null) { - // Add in any partition keys that were specified - tbl.setPartitionKeys(buildFieldSchemaList(params.getPartition_columns())); - } else { - tbl.setPartitionKeys(new ArrayList()); - } - return tbl; - } - - /** * Executes a TResetMetadataRequest and returns the result as a * TResetMetadataResponse. Based on the request parameters, this operation * may do one of three things: http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/Frontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java index 00a3d93..6d535fd 100644 --- a/fe/src/main/java/org/apache/impala/service/Frontend.java +++ b/fe/src/main/java/org/apache/impala/service/Frontend.java @@ -170,9 +170,11 @@ public class Frontend { private final AtomicReference authzChecker_; private final ScheduledExecutorService policyReader_ = Executors.newScheduledThreadPool(1); + private final String defaultKuduMasterHosts_; - public Frontend(AuthorizationConfig authorizationConfig) { - this(authorizationConfig, new ImpaladCatalog()); + public Frontend(AuthorizationConfig authorizationConfig, + String defaultKuduMasterHosts) { + this(authorizationConfig, new ImpaladCatalog(defaultKuduMasterHosts)); } /** @@ -181,6 +183,7 @@ public class Frontend { public Frontend(AuthorizationConfig authorizationConfig, ImpaladCatalog catalog) { authzConfig_ = authorizationConfig; impaladCatalog_ = catalog; + defaultKuduMasterHosts_ = catalog.getDefaultKuduMasterHosts(); authzChecker_ = new AtomicReference( new AuthorizationChecker(authzConfig_, impaladCatalog_.getAuthPolicy())); // If authorization is enabled, reload the policy on a regular basis. @@ -226,7 +229,7 @@ public class Frontend { // If this is not a delta, this update should replace the current // Catalog contents so create a new catalog and populate it. - if (!req.is_delta) catalog = new ImpaladCatalog(); + if (!req.is_delta) catalog = new ImpaladCatalog(defaultKuduMasterHosts_); TUpdateCatalogCacheResponse response = catalog.updateCatalog(req); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/JniCatalog.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/JniCatalog.java b/fe/src/main/java/org/apache/impala/service/JniCatalog.java index fc8deaf..7d0af54 100644 --- a/fe/src/main/java/org/apache/impala/service/JniCatalog.java +++ b/fe/src/main/java/org/apache/impala/service/JniCatalog.java @@ -102,7 +102,7 @@ public class JniCatalog { try { catalog_.reset(); } catch (CatalogException e) { - LOG.error("Error initialializing Catalog. Please run 'invalidate metadata'", e); + LOG.error("Error initializing Catalog. Please run 'invalidate metadata'", e); } catalogOpExecutor_ = new CatalogOpExecutor(catalog_); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/JniFrontend.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java b/fe/src/main/java/org/apache/impala/service/JniFrontend.java index 07d6ec6..0d502e5 100644 --- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java +++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java @@ -117,7 +117,8 @@ public class JniFrontend { */ public JniFrontend(boolean lazy, String serverName, String authorizationPolicyFile, String sentryConfigFile, String authPolicyProviderClass, int impalaLogLevel, - int otherLogLevel, boolean allowAuthToLocal) throws InternalException { + int otherLogLevel, boolean allowAuthToLocal, String defaultKuduMasterHosts) + throws InternalException { BackendConfig.setAuthToLocal(allowAuthToLocal); GlogAppender.Install(TLogLevel.values()[impalaLogLevel], TLogLevel.values()[otherLogLevel]); @@ -136,7 +137,7 @@ public class JniFrontend { } LOG.info(JniUtil.getJavaVersion()); - frontend_ = new Frontend(authConfig); + frontend_ = new Frontend(authConfig, defaultKuduMasterHosts); } /** http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java new file mode 100644 index 0000000..bd6d0fe --- /dev/null +++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java @@ -0,0 +1,240 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.impala.service; + +import java.lang.NumberFormatException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.impala.analysis.ToSqlUtils; +import org.apache.impala.catalog.KuduTable; +import org.apache.impala.catalog.Table; +import org.apache.impala.catalog.TableNotFoundException; +import org.apache.impala.catalog.Type; +import org.apache.impala.common.ImpalaRuntimeException; +import org.apache.impala.thrift.TCreateTableParams; +import org.apache.impala.thrift.TDistributeParam; +import org.apache.impala.util.KuduUtil; +import org.apache.kudu.ColumnSchema.ColumnSchemaBuilder; +import org.apache.kudu.ColumnSchema; +import org.apache.kudu.Schema; +import org.apache.kudu.client.CreateTableOptions; +import org.apache.kudu.client.KuduClient; +import org.apache.kudu.client.PartialRow; +import org.apache.log4j.Logger; + +/** + * This is a helper for the CatalogOpExecutor to provide Kudu related DDL functionality + * such as creating and dropping tables from Kudu. + */ +public class KuduCatalogOpExecutor { + public static final Logger LOG = Logger.getLogger(KuduCatalogOpExecutor.class); + + /** + * Create a table in Kudu with a schema equivalent to the schema stored in 'msTbl'. + * Throws an exception if 'msTbl' represents an external table or if the table couldn't + * be created in Kudu. + */ + static void createManagedTable(org.apache.hadoop.hive.metastore.api.Table msTbl, + TCreateTableParams params) throws ImpalaRuntimeException { + Preconditions.checkState(!Table.isExternalTable(msTbl)); + String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME); + String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS); + LOG.debug(String.format("Creating table '%s' in master '%s'", kuduTableName, + masterHosts)); + try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) { + // TODO: The IF NOT EXISTS case should be handled by Kudu to ensure atomicity. + // (see KUDU-1710). + if (kudu.tableExists(kuduTableName)) { + if (params.if_not_exists) return; + throw new ImpalaRuntimeException(String.format( + "Table '%s' already exists in Kudu.", kuduTableName)); + } + Schema schema = createTableSchema(msTbl, params); + CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema); + kudu.createTable(kuduTableName, schema, tableOpts); + } catch (Exception e) { + throw new ImpalaRuntimeException(String.format("Error creating table '%s'", + kuduTableName), e); + } + } + + /** + * Creates the schema of a new Kudu table. + */ + private static Schema createTableSchema( + org.apache.hadoop.hive.metastore.api.Table msTbl, TCreateTableParams params) + throws ImpalaRuntimeException { + Set keyColNames = new HashSet<>(params.getPrimary_key_column_names()); + List fieldSchemas = msTbl.getSd().getCols(); + List colSchemas = new ArrayList<>(fieldSchemas.size()); + for (FieldSchema fieldSchema : fieldSchemas) { + Type type = Type.parseColumnType(fieldSchema.getType()); + Preconditions.checkState(type != null); + org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type); + // Create the actual column and check if the column is a key column + ColumnSchemaBuilder csb = + new ColumnSchemaBuilder(fieldSchema.getName(), kuduType); + boolean isKeyCol = keyColNames.contains(fieldSchema.getName()); + csb.key(isKeyCol); + csb.nullable(!isKeyCol); + colSchemas.add(csb.build()); + } + return new Schema(colSchemas); + } + + /** + * Builds the table options of a new Kudu table. + */ + private static CreateTableOptions buildTableOptions( + org.apache.hadoop.hive.metastore.api.Table msTbl, + TCreateTableParams params, Schema schema) throws ImpalaRuntimeException { + CreateTableOptions tableOpts = new CreateTableOptions(); + // Set the distribution schemes + List distributeParams = params.getDistribute_by(); + if (distributeParams != null) { + boolean hasRangePartitioning = false; + for (TDistributeParam distParam : distributeParams) { + if (distParam.isSetBy_hash_param()) { + Preconditions.checkState(!distParam.isSetBy_range_param()); + tableOpts.addHashPartitions(distParam.getBy_hash_param().getColumns(), + distParam.getBy_hash_param().getNum_buckets()); + } else { + Preconditions.checkState(distParam.isSetBy_range_param()); + hasRangePartitioning = true; + tableOpts.setRangePartitionColumns( + distParam.getBy_range_param().getColumns()); + for (PartialRow partialRow : + KuduUtil.parseSplits(schema, distParam.getBy_range_param())) { + tableOpts.addSplitRow(partialRow); + } + } + } + // If no range-based distribution is specified in a CREATE TABLE statement, Kudu + // generates one by default that includes all the primary key columns. To prevent + // this from happening, explicitly set the range partition columns to be + // an empty list. + if (!hasRangePartitioning) { + tableOpts.setRangePartitionColumns(Collections.emptyList()); + } + } + + // Set the number of table replicas, if specified. + String replication = msTbl.getParameters().get(KuduTable.KEY_TABLET_REPLICAS); + if (!Strings.isNullOrEmpty(replication)) { + try { + int r = Integer.parseInt(replication); + Preconditions.checkState(r > 0); + tableOpts.setNumReplicas(r); + } catch (NumberFormatException e) { + throw new ImpalaRuntimeException(String.format("Invalid number of table " + + "replicas specified: '%s'", replication), e); + } + } + return tableOpts; + } + + /** + * Drops the table in Kudu. If the table does not exist and 'ifExists' is false, a + * TableNotFoundException is thrown. If the table exists and could not be dropped, + * an ImpalaRuntimeException is thrown. + */ + static void dropTable(org.apache.hadoop.hive.metastore.api.Table msTbl, + boolean ifExists) throws ImpalaRuntimeException, TableNotFoundException { + Preconditions.checkState(!Table.isExternalTable(msTbl)); + String tableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME); + String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS); + LOG.debug(String.format("Dropping table '%s' from master '%s'", tableName, + masterHosts)); + try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) { + Preconditions.checkState(!Strings.isNullOrEmpty(tableName)); + // TODO: The IF EXISTS case should be handled by Kudu to ensure atomicity. + // (see KUDU-1710). + if (kudu.tableExists(tableName)) { + kudu.deleteTable(tableName); + } else if (!ifExists) { + throw new TableNotFoundException(String.format( + "Table '%s' does not exist in Kudu master(s) '%s'.", tableName, masterHosts)); + } + } catch (Exception e) { + throw new ImpalaRuntimeException(String.format("Error dropping table '%s'", + tableName), e); + } + } + + /** + * Reads the column definitions from a Kudu table and populates 'msTbl' with + * an equivalent schema. Throws an exception if any errors are encountered. + */ + public static void populateColumnsFromKudu( + org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException { + org.apache.hadoop.hive.metastore.api.Table msTblCopy = msTbl.deepCopy(); + List cols = msTblCopy.getSd().getCols(); + String kuduTableName = msTblCopy.getParameters().get(KuduTable.KEY_TABLE_NAME); + Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName)); + String masterHosts = msTblCopy.getParameters().get(KuduTable.KEY_MASTER_HOSTS); + LOG.debug(String.format("Loading schema of table '%s' from master '%s'", + kuduTableName, masterHosts)); + try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) { + if (!kudu.tableExists(kuduTableName)) { + throw new ImpalaRuntimeException(String.format("Table does not exist in Kudu: " + + "'%s'", kuduTableName)); + } + org.apache.kudu.client.KuduTable kuduTable = kudu.openTable(kuduTableName); + // Replace the columns in the Metastore table with the columns from the recently + // accessed Kudu schema. + cols.clear(); + for (ColumnSchema colSchema : kuduTable.getSchema().getColumns()) { + Type type = KuduUtil.toImpalaType(colSchema.getType()); + cols.add(new FieldSchema(colSchema.getName(), type.toSql().toLowerCase(), null)); + } + } catch (Exception e) { + throw new ImpalaRuntimeException(String.format("Error loading schema of table " + + "'%s'", kuduTableName), e); + } + List newCols = msTbl.getSd().getCols(); + newCols.clear(); + newCols.addAll(cols); + } + + /** + * Validates the table properties of a Kudu table. It checks that the specified master + * addresses point to valid Kudu masters and that the table exists. + * Throws an ImpalaRuntimeException if this is not the case. + */ + public static void validateKuduTblExists( + org.apache.hadoop.hive.metastore.api.Table msTbl) throws ImpalaRuntimeException { + String masterHosts = msTbl.getParameters().get(KuduTable.KEY_MASTER_HOSTS); + Preconditions.checkState(!Strings.isNullOrEmpty(masterHosts)); + String kuduTableName = msTbl.getParameters().get(KuduTable.KEY_TABLE_NAME); + Preconditions.checkState(!Strings.isNullOrEmpty(kuduTableName)); + try (KuduClient kudu = new KuduClient.KuduClientBuilder(masterHosts).build()) { + kudu.tableExists(kuduTableName); + } catch (Exception e) { + throw new ImpalaRuntimeException(String.format("Kudu table '%s' does not exist " + + "on master '%s'", kuduTableName, masterHosts), e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/fe/src/main/java/org/apache/impala/util/KuduUtil.java ---------------------------------------------------------------------- diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java index b9f8653..a679032 100644 --- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java +++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java @@ -18,14 +18,15 @@ package org.apache.impala.util; import java.io.StringReader; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import javax.json.Json; import javax.json.JsonArray; import javax.json.JsonReader; +import org.apache.impala.catalog.Catalog; import org.apache.impala.catalog.ScalarType; +import org.apache.impala.catalog.Type; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.thrift.TDistributeByRangeParam; import org.apache.impala.thrift.TRangeLiteral; @@ -33,48 +34,17 @@ import org.apache.impala.thrift.TRangeLiteralList; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.Table; import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; -import org.apache.kudu.Type; -import org.apache.kudu.client.KuduTable; import org.apache.kudu.client.PartialRow; -import static org.apache.impala.catalog.Type.parseColumnType; import static java.lang.String.format; public class KuduUtil { private static final String SPLIT_KEYS_ERROR_MESSAGE = "Error parsing splits keys."; - - /** - * Compare the schema of a HMS table and a Kudu table. Returns true if both tables have - * a matching schema. - */ - public static boolean compareSchema(Table msTable, KuduTable kuduTable) - throws ImpalaRuntimeException { - List msFields = msTable.getSd().getCols(); - List kuduFields = kuduTable.getSchema().getColumns(); - if (msFields.size() != kuduFields.size()) return false; - - HashMap kuduFieldMap = Maps.newHashMap(); - for (ColumnSchema kuduField : kuduFields) { - kuduFieldMap.put(kuduField.getName().toUpperCase(), kuduField); - } - - for (FieldSchema msField : msFields) { - ColumnSchema kuduField = kuduFieldMap.get(msField.getName().toUpperCase()); - if (kuduField == null - || fromImpalaType(parseColumnType(msField.getType())) != kuduField.getType()) { - return false; - } - } - - return true; - } + private static final String KUDU_TABLE_NAME_PREFIX = "impala::"; /** * Parses split keys from statements. @@ -145,10 +115,9 @@ public class KuduUtil { /** * Sets the value in 'key' at 'pos', given the json representation. */ - private static void setKey(Type type, JsonArray array, int pos, PartialRow key) - throws ImpalaRuntimeException { + private static void setKey(org.apache.kudu.Type type, JsonArray array, int pos, + PartialRow key) throws ImpalaRuntimeException { switch (type) { - case BOOL: key.addBoolean(pos, array.getBoolean(pos)); break; case INT8: key.addByte(pos, (byte) array.getInt(pos)); break; case INT16: key.addShort(pos, (short) array.getInt(pos)); break; case INT32: key.addInt(pos, array.getInt(pos)); break; @@ -163,13 +132,9 @@ public class KuduUtil { /** * Sets the value in 'key' at 'pos', given the range literal. */ - private static void setKey(Type type, TRangeLiteral literal, int pos, String colName, - PartialRow key) throws ImpalaRuntimeException { + private static void setKey(org.apache.kudu.Type type, TRangeLiteral literal, int pos, + String colName, PartialRow key) throws ImpalaRuntimeException { switch (type) { - case BOOL: - checkCorrectType(literal.isSetBool_literal(), type, colName, literal); - key.addBoolean(pos, literal.isBool_literal()); - break; case INT8: checkCorrectType(literal.isSetInt_literal(), type, colName, literal); key.addByte(pos, (byte) literal.getInt_literal()); @@ -200,8 +165,8 @@ public class KuduUtil { * If correctType is true, returns. Otherwise throws a formatted error message * indicating problems with the type of the literal of the range literal. */ - private static void checkCorrectType(boolean correctType, Type t, String colName, - TRangeLiteral literal) throws ImpalaRuntimeException { + private static void checkCorrectType(boolean correctType, org.apache.kudu.Type t, + String colName, TRangeLiteral literal) throws ImpalaRuntimeException { if (correctType) return; throw new ImpalaRuntimeException( format("Expected %s literal for column '%s' got '%s'", t.getName(), colName, @@ -220,11 +185,24 @@ public class KuduUtil { return Lists.newArrayList(Splitter.on(",").trimResults().split(cols.toLowerCase())); } + public static boolean isSupportedKeyType(org.apache.impala.catalog.Type type) { + return type.isIntegerType() || type.isStringType(); + } + + /** + * Return the name that should be used in Kudu when creating a table, assuming a custom + * name was not provided. + */ + public static String getDefaultCreateKuduTableName(String metastoreDbName, + String metastoreTableName) { + return KUDU_TABLE_NAME_PREFIX + metastoreDbName + "." + metastoreTableName; + } + /** * Converts a given Impala catalog type to the Kudu type. Throws an exception if the * type cannot be converted. */ - public static Type fromImpalaType(org.apache.impala.catalog.Type t) + public static org.apache.kudu.Type fromImpalaType(Type t) throws ImpalaRuntimeException { if (!t.isScalarType()) { throw new ImpalaRuntimeException(format( @@ -232,16 +210,16 @@ public class KuduUtil { } ScalarType s = (ScalarType) t; switch (s.getPrimitiveType()) { - case TINYINT: return Type.INT8; - case SMALLINT: return Type.INT16; - case INT: return Type.INT32; - case BIGINT: return Type.INT64; - case BOOLEAN: return Type.BOOL; - case CHAR: return Type.STRING; - case STRING: return Type.STRING; - case VARCHAR: return Type.STRING; - case DOUBLE: return Type.DOUBLE; - case FLOAT: return Type.FLOAT; + case TINYINT: return org.apache.kudu.Type.INT8; + case SMALLINT: return org.apache.kudu.Type.INT16; + case INT: return org.apache.kudu.Type.INT32; + case BIGINT: return org.apache.kudu.Type.INT64; + case BOOLEAN: return org.apache.kudu.Type.BOOL; + case CHAR: return org.apache.kudu.Type.STRING; + case STRING: return org.apache.kudu.Type.STRING; + case VARCHAR: return org.apache.kudu.Type.STRING; + case DOUBLE: return org.apache.kudu.Type.DOUBLE; + case FLOAT: return org.apache.kudu.Type.FLOAT; /* Fall through below */ case INVALID_TYPE: case NULL_TYPE: @@ -256,11 +234,27 @@ public class KuduUtil { } } + public static Type toImpalaType(org.apache.kudu.Type t) + throws ImpalaRuntimeException { + switch (t) { + case BOOL: return Type.BOOLEAN; + case DOUBLE: return Type.DOUBLE; + case FLOAT: return Type.FLOAT; + case INT8: return Type.TINYINT; + case INT16: return Type.SMALLINT; + case INT32: return Type.INT; + case INT64: return Type.BIGINT; + case STRING: return Type.STRING; + default: + throw new ImpalaRuntimeException(String.format( + "Kudu type %s is not supported in Impala", t)); + } + } + /** * Returns the string value of the RANGE literal. */ static String toString(TRangeLiteral l) throws ImpalaRuntimeException { - if (l.isSetBool_literal()) return String.valueOf(l.bool_literal); if (l.isSetString_literal()) return String.valueOf(l.string_literal); if (l.isSetInt_literal()) return String.valueOf(l.int_literal); throw new ImpalaRuntimeException("Unsupported type for RANGE literal.");