impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From he...@apache.org
Subject [2/4] incubator-impala git commit: IMPALA-3726: Add support for Kudu-specific column options
Date Sat, 19 Nov 2016 05:40:49 GMT
IMPALA-3726: Add support for Kudu-specific column options

This commit adds support for Kudu-specific column options in CREATE
TABLE statements. The syntax is:
CREATE TABLE tbl_name ([col_name type [PRIMARY KEY] [option [...]]] [, ....])
where option is:
| NULL
| NOT NULL
| ENCODING encoding_val
| COMPRESSION compression_algorithm
| DEFAULT expr
| BLOCK_SIZE num

The output of the SHOW CREATE TABLE statement was altered to include all the specified
column options for Kudu tables.

Change-Id: I727b9ae1b7b2387db752b58081398dd3f3449c02
Reviewed-on: http://gerrit.cloudera.org:8080/5026
Reviewed-by: Dimitris Tsirogiannis <dtsirogiannis@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/3db5ced4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/3db5ced4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/3db5ced4

Branch: refs/heads/master
Commit: 3db5ced4cee8967f28e337747efba902ce71cfee
Parents: 60414f0
Author: Dimitris Tsirogiannis <dtsirogiannis@cloudera.com>
Authored: Wed Nov 9 15:11:07 2016 -0800
Committer: Internal Jenkins <cloudera-hudson@gerrit.cloudera.org>
Committed: Fri Nov 18 11:41:01 2016 +0000

----------------------------------------------------------------------
 be/src/exec/kudu-table-sink.cc                  |   6 +-
 common/thrift/CatalogObjects.thrift             |  20 +-
 fe/src/main/cup/sql-parser.cup                  | 240 +++++++++++++------
 .../analysis/AlterTableAddReplaceColsStmt.java  |   2 +-
 .../analysis/AlterTableChangeColStmt.java       |   2 +-
 .../org/apache/impala/analysis/ColumnDef.java   | 196 +++++++++++++--
 .../analysis/CreateOrAlterViewStmtBase.java     |   5 +-
 .../analysis/CreateTableAsSelectStmt.java       |   9 +-
 .../analysis/CreateTableLikeFileStmt.java       |   8 +-
 .../org/apache/impala/analysis/InsertStmt.java  |  58 +++--
 .../org/apache/impala/analysis/TableDef.java    |  16 +-
 .../org/apache/impala/analysis/ToSqlUtils.java  |  16 ++
 .../java/org/apache/impala/catalog/Column.java  |  10 +-
 .../org/apache/impala/catalog/KuduColumn.java   |  86 ++++++-
 .../org/apache/impala/catalog/KuduTable.java    |  13 +-
 .../java/org/apache/impala/catalog/Table.java   |  17 +-
 .../java/org/apache/impala/planner/Planner.java |   2 +
 .../impala/service/KuduCatalogOpExecutor.java   |  32 ++-
 .../apache/impala/util/AvroSchemaParser.java    |  18 +-
 .../org/apache/impala/util/AvroSchemaUtils.java |  17 +-
 .../java/org/apache/impala/util/KuduUtil.java   | 157 +++++++++++-
 fe/src/main/jflex/sql-scanner.flex              |   4 +
 .../apache/impala/analysis/AnalyzeDDLTest.java  |  96 +++++++-
 .../org/apache/impala/analysis/ParserTest.java  |  69 +++++-
 .../functional/functional_schema_template.sql   |  43 ++--
 .../queries/QueryTest/kudu_delete.test          |   9 +-
 .../queries/QueryTest/kudu_insert.test          |   4 +-
 .../queries/QueryTest/kudu_update.test          |   7 +-
 .../queries/QueryTest/kudu_upsert.test          |  11 +-
 tests/query_test/test_kudu.py                   |  52 ++--
 tests/shell/test_shell_commandline.py           |   2 +-
 31 files changed, 973 insertions(+), 254 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 5dd6336..a195e84 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -195,17 +195,13 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) {
     bool add_row = true;
 
     for (int j = 0; j < output_expr_ctxs_.size(); ++j) {
-      // For INSERT, output_expr_ctxs_ will contain all columns of the table in order.
-      // For UPDATE and UPSERT, output_expr_ctxs_ only contains the columns that the op
+      // output_expr_ctxs_ only contains the columns that the op
       // applies to, i.e. columns explicitly mentioned in the query, and
       // referenced_columns is then used to map to actual column positions.
       int col = kudu_table_sink_.referenced_columns.empty() ?
           j : kudu_table_sink_.referenced_columns[j];
 
       void* value = output_expr_ctxs_[j]->GetValue(current_row);
-
-      // If the value is NULL, we only need to explicitly set it for UPDATE and UPSERT.
-      // For INSERT, it can be ignored as unspecified cols will be implicitly set to NULL.
       if (value == NULL) {
         if (table_schema.Column(j).is_nullable()) {
           KUDU_RETURN_IF_ERROR(write->mutable_row()->SetNull(col),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/common/thrift/CatalogObjects.thrift
----------------------------------------------------------------------
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 9f0c6d4..10cb777 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -72,7 +72,18 @@ enum THdfsCompression {
   SNAPPY,
   SNAPPY_BLOCKED,
   LZO,
-  LZ4
+  LZ4,
+  ZLIB
+}
+
+enum TColumnEncoding {
+  AUTO,
+  PLAIN,
+  PREFIX,
+  GROUP_VARINT,
+  RLE,
+  DICTIONARY,
+  BIT_SHUFFLE
 }
 
 enum THdfsSeqCompressionMode {
@@ -191,11 +202,14 @@ struct TColumn {
   8: optional string column_qualifier
   9: optional bool is_binary
 
-  // Indicates whether this is a Kudu column. If true implies all following Kudu specific
-  // fields are set.
+  // All the following are Kudu-specific column properties
   10: optional bool is_kudu_column
   11: optional bool is_key
   12: optional bool is_nullable
+  13: optional TColumnEncoding encoding
+  14: optional THdfsCompression compression
+  15: optional Exprs.TExpr default_value
+  16: optional i32 block_size
 }
 
 // Represents a block in an HDFS file

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/cup/sql-parser.cup
----------------------------------------------------------------------
diff --git a/fe/src/main/cup/sql-parser.cup b/fe/src/main/cup/sql-parser.cup
index 1884036..2fc765d 100644
--- a/fe/src/main/cup/sql-parser.cup
+++ b/fe/src/main/cup/sql-parser.cup
@@ -18,15 +18,18 @@
 package org.apache.impala.analysis;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java_cup.runtime.Symbol;
 
 import org.apache.impala.analysis.ColumnDef;
+import org.apache.impala.analysis.ColumnDef.Option;
 import org.apache.impala.analysis.UnionStmt.Qualifier;
 import org.apache.impala.analysis.UnionStmt.UnionOperand;
 import org.apache.impala.analysis.RangePartition;
@@ -240,16 +243,17 @@ parser code {:
 // ALL KEYWORDS ALSO NEED TO BE ADDED TO THE ident_or_kw PRODUCTION.
 terminal
   KW_ADD, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_ANALYTIC, KW_AND, KW_ANTI, KW_API_VERSION,
-  KW_ARRAY, KW_AS, KW_ASC, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY, KW_BOOLEAN,
-  KW_BUCKETS, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST, KW_CHANGE, KW_CHAR,
-  KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPUTE, KW_CREATE,
-  KW_CROSS, KW_CURRENT, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME,
-  KW_DECIMAL, KW_DELETE, KW_DELIMITED, KW_DESC, KW_DESCRIBE, KW_DISTINCT, KW_DISTRIBUTE,
-  KW_DIV, KW_DOUBLE, KW_DROP, KW_ELSE, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN,
-  KW_EXTENDED, KW_EXTERNAL, KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN,
-  KW_FIRST, KW_FLOAT, KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL,
-  KW_FUNCTION, KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF,
-  KW_ILIKE, KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT,
+  KW_ARRAY, KW_AS, KW_ASC, KW_AVRO, KW_BETWEEN, KW_BIGINT, KW_BINARY, KW_BLOCKSIZE,
+  KW_BOOLEAN, KW_BUCKETS, KW_BY, KW_CACHED, KW_CASCADE, KW_CASE, KW_CAST, KW_CHANGE,
+  KW_CHAR, KW_CLASS, KW_CLOSE_FN, KW_COLUMN, KW_COLUMNS, KW_COMMENT, KW_COMPRESSION,
+  KW_COMPUTE, KW_CREATE, KW_CROSS, KW_CURRENT, KW_DATA, KW_DATABASE, KW_DATABASES,
+  KW_DATE, KW_DATETIME, KW_DECIMAL, KW_DEFAULT, KW_DELETE, KW_DELIMITED, KW_DESC,
+  KW_DESCRIBE, KW_DISTINCT, KW_DISTRIBUTE, KW_DIV, KW_DOUBLE, KW_DROP, KW_ELSE,
+  KW_ENCODING, KW_END, KW_ESCAPED, KW_EXISTS, KW_EXPLAIN, KW_EXTENDED, KW_EXTERNAL,
+  KW_FALSE, KW_FIELDS, KW_FILEFORMAT, KW_FILES, KW_FINALIZE_FN, KW_FIRST, KW_FLOAT,
+  KW_FOLLOWING, KW_FOR, KW_FORMAT, KW_FORMATTED, KW_FROM, KW_FULL, KW_FUNCTION,
+  KW_FUNCTIONS, KW_GRANT, KW_GROUP, KW_HASH, KW_IGNORE, KW_HAVING, KW_IF, KW_ILIKE,
+  KW_IN, KW_INCREMENTAL, KW_INIT_FN, KW_INNER, KW_INPATH, KW_INSERT, KW_INT,
   KW_INTERMEDIATE, KW_INTERVAL, KW_INTO, KW_INVALIDATE, KW_IREGEXP, KW_IS, KW_JOIN,
   KW_KUDU, KW_LAST, KW_LEFT, KW_LIKE, KW_LIMIT, KW_LINES, KW_LOAD, KW_LOCATION, KW_MAP,
   KW_MERGE_FN, KW_METADATA, KW_NOT, KW_NULL, KW_NULLS, KW_OFFSET, KW_ON, KW_OR, KW_ORDER,
@@ -280,6 +284,10 @@ terminal String STRING_LITERAL;
 terminal String UNMATCHED_STRING_LITERAL;
 terminal String UNEXPECTED_CHAR;
 
+// IMPALA-3726 introduced the DEFAULT keyword which could break existing applications
+// that use the identifier "KEYWORD" as database, column or table names. To avoid that,
+// the ident_or_default non-terminal is introduced and should be used instead of IDENT.
+nonterminal String ident_or_keyword, ident_or_default;
 nonterminal StatementBase stmt;
 // Single select statement.
 nonterminal SelectStmt select_stmt;
@@ -399,7 +407,6 @@ nonterminal CreateDataSrcStmt create_data_src_stmt;
 nonterminal DropDataSrcStmt drop_data_src_stmt;
 nonterminal ShowDataSrcsStmt show_data_srcs_stmt;
 nonterminal StructField struct_field_def;
-nonterminal String ident_or_keyword;
 nonterminal DistributeParam distribute_hash_param;
 nonterminal List<RangePartition> range_params_list;
 nonterminal RangePartition range_param;
@@ -415,7 +422,7 @@ nonterminal ArrayList<StructField> struct_field_def_list;
 // Options for DDL commands - CREATE/DROP/ALTER
 nonterminal HdfsCachingOp cache_op_val;
 nonterminal BigDecimal opt_cache_op_replication;
-nonterminal String comment_val;
+nonterminal String comment_val, opt_comment_val;
 nonterminal Boolean external_val;
 nonterminal Boolean purge_val;
 nonterminal String opt_init_string_val;
@@ -444,6 +451,13 @@ nonterminal String opt_kw_column;
 nonterminal String opt_kw_table;
 nonterminal Boolean overwrite_val;
 nonterminal Boolean cascade_val;
+nonterminal Boolean nullability_val;
+nonterminal String encoding_val;
+nonterminal String compression_val;
+nonterminal Expr default_val;
+nonterminal LiteralExpr block_size_val;
+nonterminal Pair<Option, Object> column_option;
+nonterminal Map<Option, Object> column_options_map;
 
 // For GRANT/REVOKE/AUTH DDL statements
 nonterminal ShowRolesStmt show_roles_stmt;
@@ -487,6 +501,7 @@ nonterminal TFunctionCategory opt_function_category;
 precedence left KW_OR;
 precedence left KW_AND;
 precedence right KW_NOT, NOT;
+precedence left KW_DEFAULT;
 precedence left KW_BETWEEN, KW_IN, KW_IS, KW_EXISTS;
 precedence left KW_LIKE, KW_RLIKE, KW_ILIKE, KW_REGEXP, KW_IREGEXP;
 precedence left EQUAL, NOTEQUAL, LESSTHAN, GREATERTHAN, KW_FROM, KW_DISTINCT;
@@ -789,31 +804,33 @@ opt_kw_table ::=
 show_roles_stmt ::=
   KW_SHOW KW_ROLES
   {: RESULT = new ShowRolesStmt(false, null); :}
-  | KW_SHOW KW_ROLE KW_GRANT KW_GROUP IDENT:group
+  | KW_SHOW KW_ROLE KW_GRANT KW_GROUP ident_or_default:group
   {: RESULT = new ShowRolesStmt(false, group); :}
   | KW_SHOW KW_CURRENT KW_ROLES
   {: RESULT = new ShowRolesStmt(true, null); :}
   ;
 
 show_grant_role_stmt ::=
-  KW_SHOW KW_GRANT KW_ROLE IDENT:role
+  KW_SHOW KW_GRANT KW_ROLE ident_or_default:role
   {: RESULT = new ShowGrantRoleStmt(role, null); :}
-  | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON server_ident:server_kw
+  | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON server_ident:server_kw
   {:
     RESULT = new ShowGrantRoleStmt(role,
         PrivilegeSpec.createServerScopedPriv(TPrivilegeLevel.ALL));
   :}
-  | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON KW_DATABASE IDENT:db_name
+  | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON
+    KW_DATABASE ident_or_default:db_name
   {:
     RESULT = new ShowGrantRoleStmt(role,
         PrivilegeSpec.createDbScopedPriv(TPrivilegeLevel.ALL, db_name));
   :}
-  | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON KW_TABLE table_name:tbl_name
+  | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON KW_TABLE table_name:tbl_name
   {:
     RESULT = new ShowGrantRoleStmt(role,
         PrivilegeSpec.createTableScopedPriv(TPrivilegeLevel.ALL, tbl_name));
   :}
-  | KW_SHOW KW_GRANT KW_ROLE IDENT:role KW_ON uri_ident:uri_kw STRING_LITERAL:uri
+  | KW_SHOW KW_GRANT KW_ROLE ident_or_default:role KW_ON uri_ident:uri_kw
+    STRING_LITERAL:uri
   {:
     RESULT = new ShowGrantRoleStmt(role,
         PrivilegeSpec.createUriScopedPriv(TPrivilegeLevel.ALL, new HdfsUri(uri)));
@@ -821,40 +838,40 @@ show_grant_role_stmt ::=
   ;
 
 create_drop_role_stmt ::=
-  KW_CREATE KW_ROLE IDENT:role
+  KW_CREATE KW_ROLE ident_or_default:role
   {: RESULT = new CreateDropRoleStmt(role, false); :}
-  | KW_DROP KW_ROLE IDENT:role
+  | KW_DROP KW_ROLE ident_or_default:role
   {: RESULT = new CreateDropRoleStmt(role, true); :}
   ;
 
 grant_role_stmt ::=
-  KW_GRANT KW_ROLE IDENT:role KW_TO KW_GROUP IDENT:group
+  KW_GRANT KW_ROLE ident_or_default:role KW_TO KW_GROUP ident_or_default:group
   {: RESULT = new GrantRevokeRoleStmt(role, group, true); :}
   ;
 
 revoke_role_stmt ::=
-  KW_REVOKE KW_ROLE IDENT:role KW_FROM KW_GROUP IDENT:group
+  KW_REVOKE KW_ROLE ident_or_default:role KW_FROM KW_GROUP ident_or_default:group
   {: RESULT = new GrantRevokeRoleStmt(role, group, false); :}
   ;
 
 grant_privilege_stmt ::=
-  KW_GRANT privilege_spec:priv KW_TO opt_kw_role:opt_role IDENT:role
+  KW_GRANT privilege_spec:priv KW_TO opt_kw_role:opt_role ident_or_default:role
   opt_with_grantopt:grant_opt
   {: RESULT = new GrantRevokePrivStmt(role, priv, true, grant_opt); :}
   ;
 
 revoke_privilege_stmt ::=
   KW_REVOKE opt_grantopt_for:grant_opt privilege_spec:priv KW_FROM
-  opt_kw_role:opt_role IDENT:role
+  opt_kw_role:opt_role ident_or_default:role
   {: RESULT = new GrantRevokePrivStmt(role, priv, false, grant_opt); :}
   ;
 
 privilege_spec ::=
   privilege:priv KW_ON server_ident:server_kw
   {: RESULT = PrivilegeSpec.createServerScopedPriv(priv); :}
-  | privilege:priv KW_ON server_ident:server_kw IDENT:server_name
+  | privilege:priv KW_ON server_ident:server_kw ident_or_default:server_name
   {: RESULT = PrivilegeSpec.createServerScopedPriv(priv, server_name); :}
-  | privilege:priv KW_ON KW_DATABASE IDENT:db_name
+  | privilege:priv KW_ON KW_DATABASE ident_or_default:db_name
   {: RESULT = PrivilegeSpec.createDbScopedPriv(priv, db_name); :}
   | privilege:priv KW_ON KW_TABLE table_name:tbl_name
   {: RESULT = PrivilegeSpec.createTableScopedPriv(priv, tbl_name); :}
@@ -902,9 +919,9 @@ alter_tbl_stmt ::=
     RESULT = new AlterTableAddPartitionStmt(table, partition,
         location, if_not_exists, cache_op);
   :}
-  | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column IDENT:col_name
+  | KW_ALTER KW_TABLE table_name:table KW_DROP opt_kw_column ident_or_default:col_name
   {: RESULT = new AlterTableDropColStmt(table, col_name); :}
-  | KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column IDENT:col_name
+  | KW_ALTER KW_TABLE table_name:table KW_CHANGE opt_kw_column ident_or_default:col_name
     column_def:col_def
   {: RESULT = new AlterTableChangeColStmt(table, col_name, col_def); :}
   | KW_ALTER KW_TABLE table_name:table KW_DROP if_exists_val:if_exists
@@ -927,7 +944,7 @@ alter_tbl_stmt ::=
     table_property_type:target LPAREN properties_map:properties RPAREN
   {: RESULT = new AlterTableSetTblProperties(table, partitions, target, properties); :}
   | KW_ALTER KW_TABLE table_name:table opt_partition_set:partition KW_SET
-    KW_COLUMN KW_STATS IDENT:col LPAREN properties_map:map RPAREN
+    KW_COLUMN KW_STATS ident_or_default:col LPAREN properties_map:map RPAREN
   {:
     // The opt_partition_set is used to avoid conflicts even though
     // a partition clause does not make sense for this stmt. If a partition
@@ -966,8 +983,8 @@ replace_existing_cols_val ::=
   ;
 
 create_db_stmt ::=
-  KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists IDENT:db_name
-  comment_val:comment location_val:location
+  KW_CREATE db_or_schema_kw if_not_exists_val:if_not_exists ident_or_default:db_name
+  opt_comment_val:comment location_val:location
   {: RESULT = new CreateDbStmt(db_name, comment, location, if_not_exists); :}
   ;
 
@@ -1032,9 +1049,9 @@ create_tbl_stmt ::=
     RESULT = new CreateTableStmt(tbl_def);
   :}
   | tbl_def_with_col_defs:tbl_def
-    KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id IDENT:data_src_name
+    KW_PRODUCED KW_BY KW_DATA source_ident:is_source_id ident_or_default:data_src_name
     opt_init_string_val:init_string
-    comment_val:comment
+    opt_comment_val:comment
   {:
     // Need external_val in the grammar to avoid shift/reduce conflict with other
     // CREATE TABLE statements.
@@ -1067,7 +1084,7 @@ create_tbl_stmt ::=
 create_tbl_like_stmt ::=
   tbl_def_without_col_defs:tbl_def
   KW_LIKE table_name:other_table
-  comment_val:comment
+  opt_comment_val:comment
   file_format_create_table_val:file_format location_val:location
   {:
     RESULT = new CreateTableLikeStmt(tbl_def.getTblName(), other_table,
@@ -1089,7 +1106,8 @@ tbl_def_with_col_defs ::=
     tbl_def.getColumnDefs().addAll(list);
     RESULT = tbl_def;
   :}
-  | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA primary_keys:primary_keys RPAREN
+  | tbl_def_without_col_defs:tbl_def LPAREN column_def_list:list COMMA
+    primary_keys:primary_keys RPAREN
   {:
     tbl_def.getColumnDefs().addAll(list);
     tbl_def.getPrimaryKeyColumnNames().addAll(primary_keys);
@@ -1103,7 +1121,7 @@ primary_keys ::=
   ;
 
 tbl_options ::=
-  comment_val:comment row_format_val:row_format serde_properties:serde_props
+  opt_comment_val:comment row_format_val:row_format serde_properties:serde_props
   file_format_create_table_val:file_format location_val:location cache_op_val:cache_op
   tbl_properties:tbl_props
   {:
@@ -1282,6 +1300,11 @@ opt_cache_op_replication ::=
 comment_val ::=
   KW_COMMENT STRING_LITERAL:comment
   {: RESULT = comment; :}
+  ;
+
+opt_comment_val ::=
+  KW_COMMENT STRING_LITERAL:comment
+  {: RESULT = comment; :}
   | /* empty */
   {: RESULT = null; :}
   ;
@@ -1422,20 +1445,81 @@ column_def_list ::=
   ;
 
 column_def ::=
-  IDENT:col_name type_def:type is_primary_key_val:primary_key comment_val:comment
-  {: RESULT = new ColumnDef(col_name, type, primary_key, comment); :}
+  ident_or_default:col_name type_def:type column_options_map:options
+  {: RESULT = new ColumnDef(col_name, type, options); :}
+  | ident_or_default:col_name type_def:type
+  {: RESULT = new ColumnDef(col_name, type); :}
+  ;
+
+column_options_map ::=
+  column_options_map:map column_option:col_option
+  {:
+    if (map.put(col_option.first, col_option.second) != null) {
+      throw new Exception(String.format("Column option %s is specified multiple times",
+          col_option.first.toString()));
+    }
+    RESULT = map;
+  :}
+  | column_option:col_option
+  {:
+    Map<Option, Object> options = Maps.newHashMap();
+    options.put(col_option.first, col_option.second);
+    RESULT = options;
+  :}
+  ;
+
+column_option ::=
+  is_primary_key_val:primary_key
+  {: RESULT = new Pair<Option, Object>(Option.IS_PRIMARY_KEY, primary_key); :}
+  | nullability_val:nullability
+  {: RESULT = new Pair<Option, Object>(Option.IS_NULLABLE, nullability); :}
+  | encoding_val:encoding
+  {: RESULT = new Pair<Option, Object>(Option.ENCODING, encoding); :}
+  | compression_val:compression
+  {: RESULT = new Pair<Option, Object>(Option.COMPRESSION, compression); :}
+  | default_val:default_val
+  {: RESULT = new Pair<Option, Object>(Option.DEFAULT, default_val); :}
+  | block_size_val:block_size
+  {: RESULT = new Pair<Option, Object>(Option.BLOCK_SIZE, block_size); :}
+  | comment_val:comment
+  {: RESULT = new Pair<Option, Object>(Option.COMMENT, comment); :}
   ;
 
 is_primary_key_val ::=
   KW_PRIMARY key_ident
   {: RESULT = true; :}
-  | /* empty */
+  ;
+
+nullability_val ::=
+  KW_NOT KW_NULL
   {: RESULT = false; :}
+  | KW_NULL
+  {: RESULT = true; :}
+  ;
+
+encoding_val ::=
+  KW_ENCODING ident_or_default:encoding_ident
+  {: RESULT = encoding_ident; :}
+  ;
+
+compression_val ::=
+  KW_COMPRESSION ident_or_default:compression_ident
+  {: RESULT = compression_ident; :}
+  ;
+
+default_val ::=
+  KW_DEFAULT expr:default_val
+  {: RESULT = default_val; :}
+  ;
+
+block_size_val ::=
+  KW_BLOCKSIZE literal:block_size
+  {: RESULT = block_size; :}
   ;
 
 create_view_stmt ::=
   KW_CREATE KW_VIEW if_not_exists_val:if_not_exists table_name:view_name
-  view_column_defs:col_defs comment_val:comment KW_AS query_stmt:view_def
+  view_column_defs:col_defs opt_comment_val:comment KW_AS query_stmt:view_def
   {:
     RESULT = new CreateViewStmt(if_not_exists, view_name, col_defs, comment, view_def);
   :}
@@ -1443,7 +1527,7 @@ create_view_stmt ::=
 
 create_data_src_stmt ::=
   KW_CREATE KW_DATA source_ident:is_source_id
-  if_not_exists_val:if_not_exists IDENT:data_src_name
+  if_not_exists_val:if_not_exists ident_or_default:data_src_name
   KW_LOCATION STRING_LITERAL:location
   KW_CLASS STRING_LITERAL:class_name
   KW_API_VERSION STRING_LITERAL:api_version
@@ -1534,8 +1618,12 @@ view_column_def_list ::=
   ;
 
 view_column_def ::=
-  IDENT:col_name comment_val:comment
-  {: RESULT = new ColumnDef(col_name, null, comment); :}
+  ident_or_default:col_name opt_comment_val:comment
+  {:
+    Map<Option, Object> options = Maps.newHashMap();
+    if (comment != null) options.put(Option.COMMENT, comment);
+    RESULT = new ColumnDef(col_name, null, options);
+  :}
   ;
 
 alter_view_stmt ::=
@@ -1571,7 +1659,8 @@ drop_stats_stmt ::=
   ;
 
 drop_db_stmt ::=
-  KW_DROP db_or_schema_kw if_exists_val:if_exists IDENT:db_name cascade_val:cascade
+  KW_DROP db_or_schema_kw if_exists_val:if_exists ident_or_default:db_name
+  cascade_val:cascade
   {: RESULT = new DropDbStmt(db_name, if_exists, cascade); :}
   ;
 
@@ -1593,7 +1682,8 @@ drop_function_stmt ::=
   ;
 
 drop_data_src_stmt ::=
-  KW_DROP KW_DATA source_ident:is_source_id if_exists_val:if_exists IDENT:data_src_name
+  KW_DROP KW_DATA source_ident:is_source_id if_exists_val:if_exists
+  ident_or_default:data_src_name
   {: RESULT = new DropDataSrcStmt(data_src_name, if_exists); :}
   ;
 
@@ -1683,7 +1773,7 @@ static_partition_key_value_list ::=
 
 partition_key_value ::=
   // Dynamic partition key values.
-  IDENT:column
+  ident_or_default:column
   {: RESULT = new PartitionKeyValue(column, null); :}
   | static_partition_key_value:partition
   {: RESULT = partition; :}
@@ -1691,7 +1781,7 @@ partition_key_value ::=
 
 static_partition_key_value ::=
   // Static partition key values.
-  IDENT:column EQUAL expr:e
+  ident_or_default:column EQUAL expr:e
   {: RESULT = new PartitionKeyValue(column, e); :}
   ;
 
@@ -1825,11 +1915,12 @@ opt_with_clause ::=
   ;
 
 with_view_def ::=
-  IDENT:alias KW_AS LPAREN query_stmt:query RPAREN
+  ident_or_default:alias KW_AS LPAREN query_stmt:query RPAREN
   {: RESULT = new View(alias, query, null); :}
   | STRING_LITERAL:alias KW_AS LPAREN query_stmt:query RPAREN
   {: RESULT = new View(alias, query, null); :}
-  | IDENT:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN query_stmt:query RPAREN
+  | ident_or_default:alias LPAREN ident_list:col_names RPAREN KW_AS LPAREN
+    query_stmt:query RPAREN
   {: RESULT = new View(alias, query, col_names); :}
   | STRING_LITERAL:alias LPAREN ident_list:col_names RPAREN
     KW_AS LPAREN query_stmt:query RPAREN
@@ -1957,7 +2048,7 @@ values_operand_list ::=
   ;
 
 use_stmt ::=
-  KW_USE IDENT:db
+  KW_USE ident_or_default:db
   {: RESULT = new UseStmt(db); :}
   ;
 
@@ -1966,9 +2057,9 @@ show_tables_stmt ::=
   {: RESULT = new ShowTablesStmt(); :}
   | KW_SHOW KW_TABLES show_pattern:showPattern
   {: RESULT = new ShowTablesStmt(showPattern); :}
-  | KW_SHOW KW_TABLES KW_IN IDENT:db
+  | KW_SHOW KW_TABLES KW_IN ident_or_default:db
   {: RESULT = new ShowTablesStmt(db, null); :}
-  | KW_SHOW KW_TABLES KW_IN IDENT:db show_pattern:showPattern
+  | KW_SHOW KW_TABLES KW_IN ident_or_default:db show_pattern:showPattern
   {: RESULT = new ShowTablesStmt(db, showPattern); :}
   ;
 
@@ -1996,9 +2087,9 @@ show_functions_stmt ::=
   {: RESULT = new ShowFunctionsStmt(null, null, fn_type); :}
   | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS show_pattern:showPattern
   {: RESULT = new ShowFunctionsStmt(null, showPattern, fn_type); :}
-  | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN IDENT:db
+  | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN ident_or_default:db
   {: RESULT = new ShowFunctionsStmt(db, null, fn_type); :}
-  | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN IDENT:db
+  | KW_SHOW opt_function_category:fn_type KW_FUNCTIONS KW_IN ident_or_default:db
       show_pattern:showPattern
   {: RESULT = new ShowFunctionsStmt(db, showPattern, fn_type); :}
   ;
@@ -2051,7 +2142,7 @@ show_files_stmt ::=
   ;
 
 describe_db_stmt ::=
-  KW_DESCRIBE db_or_schema_kw describe_output_style:style IDENT:db
+  KW_DESCRIBE db_or_schema_kw describe_output_style:style ident_or_default:db
   {: RESULT = new DescribeDbStmt(db, style); :}
   ;
 
@@ -2108,9 +2199,9 @@ select_clause ::=
   ;
 
 set_stmt ::=
-  KW_SET IDENT:key EQUAL literal:l
+  KW_SET ident_or_default:key EQUAL literal:l
   {: RESULT = new SetStmt(key, l.getStringValue()); :}
-  | KW_SET IDENT:key EQUAL IDENT:ident
+  | KW_SET ident_or_default:key EQUAL ident_or_default:ident
   {: RESULT = new SetStmt(key, ident); :}
   | KW_SET
   {: RESULT = new SetStmt(null, null); :}
@@ -2140,9 +2231,9 @@ select_list_item ::=
   ;
 
 alias_clause ::=
-  KW_AS IDENT:ident
+  KW_AS ident_or_default:ident
   {: RESULT = ident; :}
-  | IDENT:ident
+  | ident_or_default:ident
   {: RESULT = ident; :}
   | KW_AS STRING_LITERAL:l
   {: RESULT = l; :}
@@ -2158,9 +2249,9 @@ star_expr ::=
   ;
 
 table_name ::=
-  IDENT:tbl
+  ident_or_default:tbl
   {: RESULT = new TableName(null, tbl); :}
-  | IDENT:db DOT IDENT:tbl
+  | ident_or_default:db DOT ident_or_default:tbl
   {: RESULT = new TableName(db, tbl); :}
   ;
 
@@ -2295,13 +2386,13 @@ opt_plan_hints ::=
   ;
 
 ident_list ::=
-  IDENT:ident
+  ident_or_default:ident
   {:
     ArrayList<String> list = new ArrayList<String>();
     list.add(ident);
     RESULT = list;
   :}
-  | ident_list:list COMMA IDENT:ident
+  | ident_list:list COMMA ident_or_default:ident
   {:
     list.add(ident);
     RESULT = list;
@@ -2525,7 +2616,7 @@ function_call_expr ::=
   | function_name:fn_name LPAREN function_params:params RPAREN
   {: RESULT = FunctionCallExpr.createExpr(fn_name, params); :}
   // Below is a special case for EXTRACT. Idents are used to avoid adding new keywords.
-  | function_name:fn_name LPAREN IDENT:u KW_FROM expr:t RPAREN
+  | function_name:fn_name LPAREN ident_or_default:u KW_FROM expr:t RPAREN
   {:  RESULT = new ExtractFromExpr(fn_name, u, t); :}
   ;
 
@@ -2828,13 +2919,13 @@ slot_ref ::=
   ;
 
 dotted_path ::=
-  IDENT:ident
+  ident_or_default:ident
   {:
     ArrayList<String> list = new ArrayList<String>();
     list.add(ident);
     RESULT = list;
   :}
-  | dotted_path:list DOT IDENT:ident
+  | dotted_path:list DOT ident_or_default:ident
   {:
     list.add(ident);
     RESULT = list;
@@ -2895,7 +2986,7 @@ type ::=
 // that we can parse type strings from the Hive Metastore which
 // may have unquoted identifiers corresponding to keywords.
 struct_field_def ::=
-  ident_or_keyword:name COLON type:t comment_val:comment
+  ident_or_keyword:name COLON type:t opt_comment_val:comment
   {: RESULT = new StructField(name, t, comment); :}
   ;
 
@@ -2913,6 +3004,13 @@ struct_field_def_list ::=
   :}
   ;
 
+ident_or_default ::=
+  IDENT:name
+  {: RESULT = name.toString(); :}
+  | KW_DEFAULT:name
+  {: RESULT = name.toString(); :}
+  ;
+
 ident_or_keyword ::=
   IDENT:r
   {: RESULT = r.toString(); :}
@@ -2946,6 +3044,8 @@ ident_or_keyword ::=
   {: RESULT = r.toString(); :}
   | KW_BINARY:r
   {: RESULT = r.toString(); :}
+  | KW_BLOCKSIZE:r
+  {: RESULT = r.toString(); :}
   | KW_BOOLEAN:r
   {: RESULT = r.toString(); :}
   | KW_BUCKETS:r
@@ -2974,6 +3074,8 @@ ident_or_keyword ::=
   {: RESULT = r.toString(); :}
   | KW_COMMENT:r
   {: RESULT = r.toString(); :}
+  | KW_COMPRESSION:r
+  {: RESULT = r.toString(); :}
   | KW_COMPUTE:r
   {: RESULT = r.toString(); :}
   | KW_CREATE:r
@@ -2994,6 +3096,8 @@ ident_or_keyword ::=
   {: RESULT = r.toString(); :}
   | KW_DECIMAL:r
   {: RESULT = r.toString(); :}
+  | KW_DEFAULT:r
+  {: RESULT = r.toString(); :}
   | KW_DELETE:r
   {: RESULT = r.toString(); :}
   | KW_DELIMITED:r
@@ -3014,6 +3118,8 @@ ident_or_keyword ::=
   {: RESULT = r.toString(); :}
   | KW_ELSE:r
   {: RESULT = r.toString(); :}
+  | KW_ENCODING:r
+  {: RESULT = r.toString(); :}
   | KW_END:r
   {: RESULT = r.toString(); :}
   | KW_ESCAPED:r

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
index 0354117..feda138 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableAddReplaceColsStmt.java
@@ -90,7 +90,7 @@ public class AlterTableAddReplaceColsStmt extends AlterTableStmt {
     // partition columns.
     Set<String> colNames = Sets.newHashSet();
     for (ColumnDef c: columnDefs_) {
-      c.analyze();
+      c.analyze(analyzer);
       String colName = c.getColName().toLowerCase();
       if (existingPartitionKeys.contains(colName)) {
         throw new AnalysisException(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java b/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java
index 9130740..5c4bfee 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AlterTableChangeColStmt.java
@@ -90,7 +90,7 @@ public class AlterTableChangeColStmt extends AlterTableStmt {
     }
 
     // Check that the new column def's name is valid.
-    newColDef_.analyze();
+    newColDef_.analyze(analyzer);
     // Verify that if the column name is being changed, the new name doesn't conflict
     // with an existing column.
     if (!colName_.toLowerCase().equals(newColDef_.getColName().toLowerCase()) &&

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
index 923a0a6..f65aa27 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ColumnDef.java
@@ -18,21 +18,26 @@
 package org.apache.impala.analysis;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
 import com.google.common.base.Function;
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.thrift.TColumn;
+import org.apache.impala.util.KuduUtil;
 import org.apache.impala.util.MetaStoreUtil;
+import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
+import org.apache.kudu.ColumnSchema.Encoding;
 
 /**
  * Represents a column definition in a CREATE/ALTER TABLE/VIEW statement.
@@ -40,31 +45,89 @@ import org.apache.impala.util.MetaStoreUtil;
  * whereas column definitions in CREATE/ALTER VIEW statements infer the column type from
  * the corresponding view definition. All column definitions have an optional comment.
  * Since a column definition refers a column stored in the Metastore, the column name
- * must be valid according to the Metastore's rules (see @MetaStoreUtils).
+ * must be valid according to the Metastore's rules (see @MetaStoreUtils). A number of
+ * additional column options may be specified for Kudu tables.
  */
 public class ColumnDef {
   private final String colName_;
-  private String comment_;
-
   // Required in CREATE/ALTER TABLE stmts. Set to NULL in CREATE/ALTER VIEW stmts,
   // for which we setType() after analyzing the defining view definition stmt.
   private final TypeDef typeDef_;
   private Type type_;
+  private String comment_;
 
-  // Set to true if the user specified "PRIMARY KEY" in the column definition. Kudu table
-  // definitions may use this.
-  private boolean isPrimaryKey_;
-
-  public ColumnDef(String colName, TypeDef typeDef, String comment) {
-    this(colName, typeDef, false, comment);
+  // Available column options
+  public enum Option {
+    IS_PRIMARY_KEY,
+    IS_NULLABLE,
+    ENCODING,
+    COMPRESSION,
+    DEFAULT,
+    BLOCK_SIZE,
+    COMMENT
   }
 
-  public ColumnDef(String colName, TypeDef typeDef, boolean isPrimaryKey,
-      String comment) {
+  // Kudu-specific column options
+  //
+  // Set to true if the user specified "PRIMARY KEY" in the column definition.
+  private boolean isPrimaryKey_;
+  // Set to true if this column may contain null values. Can be NULL if
+  // not specified.
+  private Boolean isNullable_;
+  private String encodingVal_;
+  // Encoding for this column; set in analysis.
+  private Encoding encoding_;
+  private String compressionVal_;
+  // Compression algorithm for this column; set in analysis.
+  private CompressionAlgorithm compression_;
+  // Default value for this column.
+  private Expr defaultValue_;
+  // Desired block size for this column.
+  private LiteralExpr blockSize_;
+
+  public ColumnDef(String colName, TypeDef typeDef, Map<Option, Object> options) {
+    Preconditions.checkNotNull(options);
     colName_ = colName.toLowerCase();
     typeDef_ = typeDef;
-    isPrimaryKey_ = isPrimaryKey;
-    comment_ = comment;
+    for (Map.Entry<Option, Object> option: options.entrySet()) {
+      switch (option.getKey()) {
+        case IS_PRIMARY_KEY:
+          Preconditions.checkState(option.getValue() instanceof Boolean);
+          isPrimaryKey_ = (Boolean) option.getValue();
+          break;
+        case IS_NULLABLE:
+          Preconditions.checkState(option.getValue() instanceof Boolean);
+          isNullable_ = (Boolean) option.getValue();
+          break;
+        case ENCODING:
+          Preconditions.checkState(option.getValue() instanceof String);
+          encodingVal_ = ((String) option.getValue()).toUpperCase();
+          break;
+        case COMPRESSION:
+          Preconditions.checkState(option.getValue() instanceof String);
+          compressionVal_ = ((String) option.getValue()).toUpperCase();
+          break;
+        case DEFAULT:
+          Preconditions.checkState(option.getValue() instanceof Expr);
+          defaultValue_ = (Expr) option.getValue();
+          break;
+        case BLOCK_SIZE:
+          Preconditions.checkState(option.getValue() instanceof LiteralExpr);
+          blockSize_ = (LiteralExpr) option.getValue();
+          break;
+        case COMMENT:
+          Preconditions.checkState(option.getValue() instanceof String);
+          comment_ = (String) option.getValue();
+          break;
+        default:
+          throw new IllegalStateException(String.format("Illegal option %s",
+              option.getKey()));
+      }
+    }
+  }
+
+  public ColumnDef(String colName, TypeDef typeDef) {
+    this(colName, typeDef, Collections.<Option, Object>emptyMap());
   }
 
   /**
@@ -81,8 +144,7 @@ public class ColumnDef {
     colName_ = fs.getName();
     typeDef_ = new TypeDef(type);
     comment_ = fs.getComment();
-    isPrimaryKey_ = false;
-    analyze();
+    analyze(null);
   }
 
   public String getColName() { return colName_; }
@@ -92,8 +154,13 @@ public class ColumnDef {
   boolean isPrimaryKey() { return isPrimaryKey_; }
   public void setComment(String comment) { comment_ = comment; }
   public String getComment() { return comment_; }
+  public boolean hasKuduOptions() {
+    return isPrimaryKey_ || isNullable_ != null || encodingVal_ != null
+        || compressionVal_ != null || defaultValue_ != null || blockSize_ != null;
+  }
+  public boolean isNullable() { return isNullable_ != null && isNullable_; }
 
-  public void analyze() throws AnalysisException {
+  public void analyze(Analyzer analyzer) throws AnalysisException {
     // Check whether the column name meets the Metastore's requirements.
     if (!MetaStoreUtils.validateName(colName_)) {
       throw new AnalysisException("Invalid column/field name: " + colName_);
@@ -112,6 +179,10 @@ public class ColumnDef {
           "%s has %d characters.", colName_, MetaStoreUtil.MAX_TYPE_NAME_LENGTH,
           typeSql, typeSql.length()));
     }
+    if (hasKuduOptions()) {
+      Preconditions.checkNotNull(analyzer);
+      analyzeKuduOptions(analyzer);
+    }
     if (comment_ != null &&
         comment_.length() > MetaStoreUtil.CREATE_MAX_COMMENT_LENGTH) {
       throw new AnalysisException(String.format(
@@ -121,6 +192,79 @@ public class ColumnDef {
     }
   }
 
+  private void analyzeKuduOptions(Analyzer analyzer) throws AnalysisException {
+    if (isPrimaryKey_ && isNullable_ != null && isNullable_) {
+      throw new AnalysisException("Primary key columns cannot be nullable: " +
+          toString());
+    }
+    // Encoding value
+    if (encodingVal_ != null) {
+      try {
+        encoding_ = Encoding.valueOf(encodingVal_);
+      } catch (IllegalArgumentException e) {
+        throw new AnalysisException(String.format("Unsupported encoding value '%s'. " +
+            "Supported encoding values are: %s", encodingVal_,
+            Joiner.on(", ").join(Encoding.values())));
+      }
+    }
+    // Compression algorithm
+    if (compressionVal_ != null) {
+      try {
+        compression_ = CompressionAlgorithm.valueOf(compressionVal_);
+      } catch (IllegalArgumentException e) {
+        throw new AnalysisException(String.format("Unsupported compression " +
+            "algorithm '%s'. Supported compression algorithms are: %s", compressionVal_,
+            Joiner.on(", ").join(CompressionAlgorithm.values())));
+      }
+    }
+    // Analyze the default value, if any.
+    // TODO: Similar checks are applied for range partition values in
+    // RangePartition.analyzeBoundaryValue(). Consider consolidating the logic into a
+    // single function.
+    if (defaultValue_ != null) {
+      try {
+        defaultValue_.analyze(analyzer);
+      } catch (AnalysisException e) {
+        throw new AnalysisException(String.format("Only constant values are allowed " +
+            "for default values: %s", defaultValue_.toSql()), e);
+      }
+      if (!defaultValue_.isConstant()) {
+        throw new AnalysisException(String.format("Only constant values are allowed " +
+            "for default values: %s", defaultValue_.toSql()));
+      }
+      defaultValue_ = LiteralExpr.create(defaultValue_, analyzer.getQueryCtx());
+      if (defaultValue_ == null) {
+        throw new AnalysisException(String.format("Only constant values are allowed " +
+            "for default values: %s", defaultValue_.toSql()));
+      }
+      if (defaultValue_.getType().isNull() && ((isNullable_ != null && !isNullable_)
+          || isPrimaryKey_)) {
+        throw new AnalysisException(String.format("Default value of NULL not allowed " +
+            "on non-nullable column: '%s'", getColName()));
+      }
+      if (!Type.isImplicitlyCastable(defaultValue_.getType(), type_, true)) {
+        throw new AnalysisException(String.format("Default value %s (type: %s) " +
+            "is not compatible with column '%s' (type: %s).", defaultValue_.toSql(),
+            defaultValue_.getType().toSql(), colName_, type_.toSql()));
+      }
+      if (!defaultValue_.getType().equals(type_)) {
+        Expr castLiteral = defaultValue_.uncheckedCastTo(type_);
+        Preconditions.checkNotNull(castLiteral);
+        defaultValue_ = LiteralExpr.create(castLiteral, analyzer.getQueryCtx());
+      }
+      Preconditions.checkNotNull(defaultValue_);
+    }
+
+    // Analyze the block size value, if any.
+    if (blockSize_ != null) {
+      blockSize_.analyze(null);
+      if (!blockSize_.getType().isIntegerType()) {
+        throw new AnalysisException(String.format("Invalid value for BLOCK_SIZE: %s. " +
+            "A positive INTEGER value is expected.", blockSize_.toSql()));
+      }
+    }
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder(colName_).append(" ");
@@ -130,6 +274,11 @@ public class ColumnDef {
       sb.append(typeDef_);
     }
     if (isPrimaryKey_) sb.append(" PRIMARY KEY");
+    if (isNullable_ != null) sb.append(isNullable_ ? " NULL" : " NOT NULL");
+    if (encoding_ != null) sb.append(" ENCODING " + encoding_.toString());
+    if (compression_ != null) sb.append(" COMPRESSION " + compression_.toString());
+    if (defaultValue_ != null) sb.append(" DEFAULT_VALUE " + defaultValue_.toSql());
+    if (blockSize_ != null) sb.append(" BLOCK_SIZE " + blockSize_.toSql());
     if (comment_ != null) sb.append(String.format(" COMMENT '%s'", comment_));
     return sb.toString();
   }
@@ -146,12 +295,21 @@ public class ColumnDef {
         .append(isPrimaryKey_, rhs.isPrimaryKey_)
         .append(typeDef_, rhs.typeDef_)
         .append(type_, rhs.type_)
+        .append(isNullable_, rhs.isNullable_)
+        .append(encoding_, rhs.encoding_)
+        .append(compression_, rhs.compression_)
+        .append(defaultValue_, rhs.defaultValue_)
+        .append(blockSize_, rhs.blockSize_)
         .isEquals();
   }
 
   public TColumn toThrift() {
-    TColumn col = new TColumn(new TColumn(getColName(), type_.toThrift()));
-    col.setComment(getComment());
+    TColumn col = new TColumn(getColName(), type_.toThrift());
+    Integer blockSize =
+        blockSize_ == null ? null : (int) ((NumericLiteral) blockSize_).getIntValue();
+    KuduUtil.setColumnOptions(col, isPrimaryKey_, isNullable_, encoding_,
+        compression_, defaultValue_, blockSize);
+    if (comment_ != null) col.setComment(comment_);
     return col;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java b/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java
index 54b8557..5f524f5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateOrAlterViewStmtBase.java
@@ -18,6 +18,7 @@
 package org.apache.impala.analysis;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -114,7 +115,7 @@ public abstract class CreateOrAlterViewStmtBase extends StatementBase {
       List<String> labels = viewDefStmt_.getColLabels();
       Preconditions.checkState(exprs.size() == labels.size());
       for (int i = 0; i < viewDefStmt_.getColLabels().size(); ++i) {
-        ColumnDef colDef = new ColumnDef(labels.get(i), null, null);
+        ColumnDef colDef = new ColumnDef(labels.get(i), null);
         colDef.setType(exprs.get(i).getType());
         finalColDefs_.add(colDef);
       }
@@ -124,7 +125,7 @@ public abstract class CreateOrAlterViewStmtBase extends StatementBase {
     // duplicate column names.
     Set<String> distinctColNames = Sets.newHashSet();
     for (ColumnDef colDesc: finalColDefs_) {
-      colDesc.analyze();
+      colDesc.analyze(null);
       if (!distinctColNames.add(colDesc.getColName().toLowerCase())) {
         throw new AnalysisException("Duplicate column name: " + colDesc.getColName());
       }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
index 1e53d1e..5dca6b5 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableAsSelectStmt.java
@@ -17,8 +17,9 @@
 
 package org.apache.impala.analysis;
 
-import java.util.EnumSet;
+import java.util.Collections;
 import java.util.List;
+import java.util.EnumSet;
 
 import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.Db;
@@ -147,7 +148,7 @@ public class CreateTableAsSelectStmt extends StatementBase {
               "mismatch: %s != %s", partitionLabel, colLabel));
         }
 
-        ColumnDef colDef = new ColumnDef(colLabel, null, null);
+        ColumnDef colDef = new ColumnDef(colLabel, null);
         colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType());
         createStmt_.getPartitionColumnDefs().add(colDef);
       }
@@ -159,8 +160,8 @@ public class CreateTableAsSelectStmt extends StatementBase {
     int colCnt = tmpQueryStmt.getColLabels().size();
     createStmt_.getColumnDefs().clear();
     for (int i = 0; i < colCnt; ++i) {
-      ColumnDef colDef = new ColumnDef(
-          tmpQueryStmt.getColLabels().get(i), null, null);
+      ColumnDef colDef = new ColumnDef(tmpQueryStmt.getColLabels().get(i), null,
+          Collections.<ColumnDef.Option, Object>emptyMap());
       colDef.setType(tmpQueryStmt.getBaseTblResultExprs().get(i).getType());
       createStmt_.getColumnDefs().add(colDef);
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
index a653323..432e9c1 100644
--- a/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/CreateTableLikeFileStmt.java
@@ -21,9 +21,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -36,7 +38,6 @@ import org.apache.impala.authorization.Privilege;
 import org.apache.impala.catalog.ArrayType;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
-import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.MapType;
 import org.apache.impala.catalog.ScalarType;
 import org.apache.impala.catalog.StructField;
@@ -328,8 +329,9 @@ public class CreateTableLikeFileStmt extends CreateTableStmt {
       Type type = convertParquetType(field);
       Preconditions.checkNotNull(type);
       String colName = field.getName();
-      schema.add(new ColumnDef(colName, new TypeDef(type),
-          "Inferred from Parquet file."));
+      Map<ColumnDef.Option, Object> option = Maps.newHashMap();
+      option.put(ColumnDef.Option.COMMENT, "Inferred from Parquet file.");
+      schema.add(new ColumnDef(colName, new TypeDef(type), option));
     }
     return schema;
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
index 55005f9..87f7cef 100644
--- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
+++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java
@@ -27,6 +27,7 @@ import org.apache.impala.authorization.PrivilegeRequestBuilder;
 import org.apache.impala.catalog.Column;
 import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsTable;
+import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.Table;
 import org.apache.impala.catalog.Type;
@@ -41,7 +42,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -126,18 +126,20 @@ public class InsertStmt extends StatementBase {
 
   // Output expressions that produce the final results to write to the target table. May
   // include casts. Set in prepareExpressions().
-  // If this is an INSERT, will contain one Expr for all non-partition columns of the
-  // target table with NullLiterals where an output column isn't explicitly mentioned.
-  // The i'th expr produces the i'th column of the target table.
-  // If this is an UPSERT, will contain one Expr per column mentioned in the query and
-  // mentionedUpsertColumns_ is used to map between the Exprs and columns in the target
-  // table.
+  // If this is an INSERT on a non-Kudu table, it will contain one Expr for all
+  // non-partition columns of the target table with NullLiterals where an output
+  // column isn't explicitly mentioned. The i'th expr produces the i'th column of
+  // the target table.
+  //
+  // For Kudu tables (INSERT and UPSERT operations), it will contain one Expr per column
+  // mentioned in the query and mentionedColumns_ is used to map between the Exprs
+  // and columns in the target table.
   private ArrayList<Expr> resultExprs_ = Lists.newArrayList();
 
   // Position mapping of exprs in resultExprs_ to columns in the target table -
-  // resultExprs_[i] produces the mentionedUpsertColumns_[i] column of the target table.
-  // Only used for UPSERT, set in prepareExpressions().
-  private final List<Integer> mentionedUpsertColumns_ = Lists.newArrayList();
+  // resultExprs_[i] produces the mentionedColumns_[i] column of the target table.
+  // Only used for Kudu tables, set in prepareExpressions().
+  private final List<Integer> mentionedColumns_ = Lists.newArrayList();
 
   // Set in analyze(). Exprs corresponding to key columns of Kudu tables. Empty for
   // non-Kudu tables.
@@ -209,7 +211,7 @@ public class InsertStmt extends StatementBase {
     hasNoShuffleHint_ = false;
     hasClusteredHint_ = false;
     resultExprs_.clear();
-    mentionedUpsertColumns_.clear();
+    mentionedColumns_.clear();
     primaryKeyExprs_.clear();
   }
 
@@ -277,8 +279,9 @@ public class InsertStmt extends StatementBase {
     // Finally, prepareExpressions analyzes the expressions themselves, and confirms that
     // they are type-compatible with the target columns. Where columns are not mentioned
     // (and by this point, we know that missing columns are not partition columns),
-    // prepareExpressions assigns them a NULL literal expressions, unless this is an
-    // UPSERT, in which case we don't want to overwrite unmentioned columns with NULL.
+    // prepareExpressions assigns them a NULL literal expressions, unless the target is
+    // a Kudu table, in which case we don't want to overwrite unmentioned columns with
+    // NULL.
 
     // An null permutation clause is the same as listing all non-partition columns in
     // order.
@@ -602,7 +605,7 @@ public class InsertStmt extends StatementBase {
    *
    * 3. Populates resultExprs_ with type-compatible expressions, in Hive column order,
    * for all expressions in the select-list. Unmentioned columns are assigned NULL literal
-   * expressions, unless this is an UPSERT.
+   * expressions, unless the target is a Kudu table.
    *
    * 4. Result exprs for key columns of Kudu tables are stored in primaryKeyExprs_.
    *
@@ -667,6 +670,7 @@ public class InsertStmt extends StatementBase {
       expr.analyze(analyzer);
     }
 
+    boolean isKuduTable = table_ instanceof KuduTable;
     // Finally, 'undo' the permutation so that the selectListExprs are in Hive column
     // order, and add NULL expressions to all missing columns, unless this is an UPSERT.
     ArrayList<Column> columns = table_.getColumnsInHiveOrder();
@@ -676,23 +680,32 @@ public class InsertStmt extends StatementBase {
       for (int i = 0; i < selectListExprs.size(); ++i) {
         if (selectExprTargetColumns.get(i).getName().equals(tblColumn.getName())) {
           resultExprs_.add(selectListExprs.get(i));
-          if (isUpsert_) mentionedUpsertColumns_.add(col);
+          if (isKuduTable) mentionedColumns_.add(col);
           matchFound = true;
           break;
         }
       }
       // If no match is found, either the column is a clustering column with a static
       // value, or it was unmentioned and therefore should have a NULL select-list
-      // expression if this is an INSERT.
+      // expression if this is an INSERT and the target is not a Kudu table.
       if (!matchFound) {
-        if (tblColumn.getPosition() >= numClusteringCols && !isUpsert_) {
-          // Unmentioned non-clustering columns get NULL literals with the appropriate
-          // target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
-          resultExprs_.add(NullLiteral.create(tblColumn.getType()));
+        if (tblColumn.getPosition() >= numClusteringCols) {
+          if (isKuduTable) {
+            Preconditions.checkState(tblColumn instanceof KuduColumn);
+            KuduColumn kuduCol = (KuduColumn) tblColumn;
+            if (!kuduCol.hasDefaultValue() && !kuduCol.isNullable()) {
+              throw new AnalysisException("Missing values for column that is not " +
+                  "nullable and has no default value " + kuduCol.getName());
+            }
+          } else {
+            // Unmentioned non-clustering columns get NULL literals with the appropriate
+            // target type because Parquet cannot handle NULL_TYPE (IMPALA-617).
+            resultExprs_.add(NullLiteral.create(tblColumn.getType()));
+          }
         }
       }
       // Store exprs for Kudu key columns.
-      if (matchFound && table_ instanceof KuduTable) {
+      if (matchFound && isKuduTable) {
         KuduTable kuduTable = (KuduTable) table_;
         if (kuduTable.isPrimaryKeyColumn(tblColumn.getName())) {
           primaryKeyExprs_.add(Iterables.getLast(resultExprs_));
@@ -779,9 +792,8 @@ public class InsertStmt extends StatementBase {
   public DataSink createDataSink() {
     // analyze() must have been called before.
     Preconditions.checkState(table_ != null);
-    Preconditions.checkState(isUpsert_ || mentionedUpsertColumns_.isEmpty());
     return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT,
-        partitionKeyExprs_, mentionedUpsertColumns_, overwrite_);
+        partitionKeyExprs_, mentionedColumns_, overwrite_);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/TableDef.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/TableDef.java b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
index ce08e36..1c16954 100644
--- a/fe/src/main/java/org/apache/impala/analysis/TableDef.java
+++ b/fe/src/main/java/org/apache/impala/analysis/TableDef.java
@@ -174,7 +174,7 @@ class TableDef {
     Preconditions.checkState(tableName_ != null && !tableName_.isEmpty());
     fqTableName_ = analyzer.getFqTableName(getTblName());
     fqTableName_.analyze();
-    analyzeColumnDefs();
+    analyzeColumnDefs(analyzer);
     analyzePrimaryKeys();
 
     if (analyzer.dbContainsTable(getTblName().getDb(), getTbl(), Privilege.CREATE)
@@ -194,16 +194,20 @@ class TableDef {
    * Analyzes table and partition column definitions, checking whether all column
    * names are unique.
    */
-  private void analyzeColumnDefs() throws AnalysisException {
+  private void analyzeColumnDefs(Analyzer analyzer) throws AnalysisException {
     Set<String> colNames = Sets.newHashSet();
     for (ColumnDef colDef: columnDefs_) {
-      colDef.analyze();
+      colDef.analyze(analyzer);
       if (!colNames.add(colDef.getColName().toLowerCase())) {
         throw new AnalysisException("Duplicate column name: " + colDef.getColName());
       }
+      if (getFileFormat() != THdfsFileFormat.KUDU && colDef.hasKuduOptions()) {
+        throw new AnalysisException(String.format("Unsupported column options for " +
+            "file format '%s': '%s'", getFileFormat().name(), colDef.toString()));
+      }
     }
     for (ColumnDef colDef: getPartitionColumnDefs()) {
-      colDef.analyze();
+      colDef.analyze(analyzer);
       if (!colDef.getType().supportsTablePartitioning()) {
         throw new AnalysisException(
             String.format("Type '%s' is not supported as partition-column type " +
@@ -247,6 +251,10 @@ class TableDef {
         throw new AnalysisException(String.format(
             "PRIMARY KEY column '%s' does not exist in the table", colName));
       }
+      if (colDef.isNullable()) {
+        throw new AnalysisException("Primary key columns cannot be nullable: " +
+            colDef.toString());
+      }
       primaryKeyColDefs_.add(colDef);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
index aa24336..f01a78c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
+++ b/fe/src/main/java/org/apache/impala/analysis/ToSqlUtils.java
@@ -41,6 +41,7 @@ import org.apache.impala.catalog.Function;
 import org.apache.impala.catalog.HBaseTable;
 import org.apache.impala.catalog.HdfsCompression;
 import org.apache.impala.catalog.HdfsFileFormat;
+import org.apache.impala.catalog.KuduColumn;
 import org.apache.impala.catalog.KuduTable;
 import org.apache.impala.catalog.RowFormat;
 import org.apache.impala.catalog.Table;
@@ -351,6 +352,21 @@ public class ToSqlUtils {
   private static String columnToSql(Column col) {
     StringBuilder sb = new StringBuilder(col.getName());
     if (col.getType() != null) sb.append(" " + col.getType().toSql());
+    if (col instanceof KuduColumn) {
+      KuduColumn kuduCol = (KuduColumn) col;
+      Boolean isNullable = kuduCol.isNullable();
+      if (isNullable != null) sb.append(isNullable ? " NULL" : " NOT NULL");
+      if (kuduCol.getEncoding() != null) sb.append(" ENCODING " + kuduCol.getEncoding());
+      if (kuduCol.getCompression() != null) {
+        sb.append(" COMPRESSION " + kuduCol.getCompression());
+      }
+      if (kuduCol.getDefaultValue() != null) {
+        sb.append(" DEFAULT " + kuduCol.getDefaultValue().toSql());
+      }
+      if (kuduCol.getBlockSize() != 0) {
+        sb.append(String.format(" BLOCK_SIZE %d", kuduCol.getBlockSize()));
+      }
+    }
     if (!Strings.isNullOrEmpty(col.getComment())) {
       sb.append(String.format(" COMMENT '%s'", col.getComment()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/catalog/Column.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Column.java b/fe/src/main/java/org/apache/impala/catalog/Column.java
index 91928aa..0830f61 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Column.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Column.java
@@ -31,6 +31,8 @@ import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.impala.common.ImpalaRuntimeException;
+
 /**
  * Internal representation of column-related metadata.
  * Owned by Catalog instance.
@@ -84,7 +86,7 @@ public class Column {
                   .add("position_", position_).toString();
   }
 
-  public static Column fromThrift(TColumn columnDesc) {
+  public static Column fromThrift(TColumn columnDesc) throws ImpalaRuntimeException {
     String comment = columnDesc.isSetComment() ? columnDesc.getComment() : null;
     Preconditions.checkState(columnDesc.isSetPosition());
     int position = columnDesc.getPosition();
@@ -98,11 +100,7 @@ public class Column {
           columnDesc.getColumn_qualifier(), columnDesc.isIs_binary(),
           Type.fromThrift(columnDesc.getColumnType()), comment, position);
     } else if (columnDesc.isIs_kudu_column()) {
-      Preconditions.checkState(columnDesc.isSetIs_key());
-      Preconditions.checkState(columnDesc.isSetIs_nullable());
-      col = new KuduColumn(columnDesc.getColumnName(), columnDesc.isIs_key(),
-          columnDesc.isIs_nullable(),
-          Type.fromThrift(columnDesc.getColumnType()), comment, position);
+      col = KuduColumn.fromThrift(columnDesc, position);
     } else {
       // Hdfs table column.
       col = new Column(columnDesc.getColumnName(),

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
index 404dbf5..5640748 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduColumn.java
@@ -17,36 +17,108 @@
 
 package org.apache.impala.catalog;
 
+import com.google.common.base.Preconditions;
+import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.common.AnalysisException;
+import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.thrift.TColumn;
+import org.apache.impala.util.KuduUtil;
+
+import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
+import org.apache.kudu.ColumnSchema.Encoding;
+import org.apache.kudu.ColumnSchema;
 
 /**
- *  Describes a Kudu column mapped to a Hive column (as described in the metastore).
- *  This class extends Column with Kudu-specific information about whether it is part of a primary
- *  key, and whether it is nullable.
+ *  Represents a Kudu column.
+ *
+ *  This class extends Column with Kudu-specific information:
+ *  - primary key
+ *  - nullability constraint
+ *  - encoding
+ *  - compression
+ *  - default value
+ *  - desired block size
  */
 public class KuduColumn extends Column {
   private final boolean isKey_;
   private final boolean isNullable_;
+  private final Encoding encoding_;
+  private final CompressionAlgorithm compression_;
+  private final LiteralExpr defaultValue_;
+  private final int blockSize_;
 
-  public KuduColumn(String name, boolean isKey, boolean isNullable, Type type,
-      String comment, int position) {
+  private KuduColumn(String name, Type type, boolean isKey, boolean isNullable,
+      Encoding encoding, CompressionAlgorithm compression, LiteralExpr defaultValue,
+      int blockSize, String comment, int position) {
     super(name, type, comment, position);
     isKey_ = isKey;
     isNullable_ = isNullable;
+    encoding_ = encoding;
+    compression_ = compression;
+    defaultValue_ = defaultValue;
+    blockSize_ = blockSize;
+  }
+
+  public static KuduColumn fromColumnSchema(ColumnSchema colSchema, int position)
+      throws ImpalaRuntimeException {
+    Type type = KuduUtil.toImpalaType(colSchema.getType());
+    Object defaultValue = colSchema.getDefaultValue();
+    LiteralExpr defaultValueExpr = null;
+    if (defaultValue != null) {
+      try {
+        defaultValueExpr = LiteralExpr.create(defaultValue.toString(), type);
+      } catch (AnalysisException e) {
+        throw new ImpalaRuntimeException(String.format("Error parsing default value: " +
+            "'%s'", defaultValue), e);
+      }
+      Preconditions.checkNotNull(defaultValueExpr);
+    }
+    return new KuduColumn(colSchema.getName(), type, colSchema.isKey(),
+        colSchema.isNullable(), colSchema.getEncoding(),
+        colSchema.getCompressionAlgorithm(), defaultValueExpr,
+        colSchema.getDesiredBlockSize(), null, position);
+  }
+
+  public static KuduColumn fromThrift(TColumn column, int position)
+      throws ImpalaRuntimeException {
+    Preconditions.checkState(column.isSetIs_key());
+    Preconditions.checkState(column.isSetIs_nullable());
+    Type columnType = Type.fromThrift(column.getColumnType());
+    Encoding encoding = null;
+    if (column.isSetEncoding()) encoding = KuduUtil.fromThrift(column.getEncoding());
+    CompressionAlgorithm compression = null;
+    if (column.isSetCompression()) {
+      compression = KuduUtil.fromThrift(column.getCompression());
+    }
+    LiteralExpr defaultValue = null;
+    if (column.isSetDefault_value()) {
+      defaultValue =
+          LiteralExpr.fromThrift(column.getDefault_value().getNodes().get(0), columnType);
+    }
+    int blockSize = 0;
+    if (column.isSetBlock_size()) blockSize = column.getBlock_size();
+    return new KuduColumn(column.getColumnName(), columnType, column.isIs_key(),
+        column.isIs_nullable(), encoding, compression, defaultValue, blockSize, null,
+        position);
   }
 
   public boolean isKey() { return isKey_; }
   public boolean isNullable() { return isNullable_; }
+  public Encoding getEncoding() { return encoding_; }
+  public CompressionAlgorithm getCompression() { return compression_; }
+  public LiteralExpr getDefaultValue() { return defaultValue_; }
+  public boolean hasDefaultValue() { return defaultValue_ != null; }
+  public int getBlockSize() { return blockSize_; }
 
   @Override
   public TColumn toThrift() {
     TColumn colDesc = new TColumn(name_, type_.toThrift());
+    KuduUtil.setColumnOptions(colDesc, isKey_, isNullable_, encoding_, compression_,
+        defaultValue_, blockSize_);
     if (comment_ != null) colDesc.setComment(comment_);
     colDesc.setCol_stats(getStats().toThrift());
     colDesc.setPosition(position_);
     colDesc.setIs_kudu_column(true);
-    colDesc.setIs_key(isKey_);
-    colDesc.setIs_nullable(isNullable_);
     return colDesc;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
index 7b906a7..0e88905 100644
--- a/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/KuduTable.java
@@ -215,12 +215,13 @@ public class KuduTable extends Table {
     cols.clear();
     int pos = 0;
     for (ColumnSchema colSchema: kuduTable.getSchema().getColumns()) {
-      Type type = KuduUtil.toImpalaType(colSchema.getType());
-      String colName = colSchema.getName();
-      cols.add(new FieldSchema(colName, type.toSql().toLowerCase(), null));
-      boolean isKey = colSchema.isKey();
-      if (isKey) primaryKeyColumnNames_.add(colName);
-      addColumn(new KuduColumn(colName, isKey, !isKey, type, null, pos));
+      KuduColumn kuduCol = KuduColumn.fromColumnSchema(colSchema, pos);
+      Preconditions.checkNotNull(kuduCol);
+      // Add the HMS column
+      cols.add(new FieldSchema(kuduCol.getName(), kuduCol.getType().toSql().toLowerCase(),
+          null));
+      if (kuduCol.isKey()) primaryKeyColumnNames_.add(kuduCol.getName());
+      addColumn(kuduCol);
       ++pos;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/catalog/Table.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index be9dc7b..7bde786 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -256,12 +256,17 @@ public abstract class Table implements CatalogObject {
 
     colsByPos_.clear();
     colsByPos_.ensureCapacity(columns.size());
-    for (int i = 0; i < columns.size(); ++i) {
-      Column col = Column.fromThrift(columns.get(i));
-      colsByPos_.add(col.getPosition(), col);
-      colsByName_.put(col.getName().toLowerCase(), col);
-      ((StructType) type_.getItemType()).addField(
-          new StructField(col.getName(), col.getType(), col.getComment()));
+    try {
+      for (int i = 0; i < columns.size(); ++i) {
+        Column col = Column.fromThrift(columns.get(i));
+        colsByPos_.add(col.getPosition(), col);
+        colsByName_.put(col.getName().toLowerCase(), col);
+        ((StructType) type_.getItemType()).addField(
+            new StructField(col.getName(), col.getType(), col.getComment()));
+      }
+    } catch (ImpalaRuntimeException e) {
+      throw new TableLoadingException(String.format("Error loading schema for " +
+          "table '%s'", getName()), e);
     }
 
     numClusteringCols_ = thriftTable.getClustering_columns().size();

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/planner/Planner.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/planner/Planner.java b/fe/src/main/java/org/apache/impala/planner/Planner.java
index 52524fb..658fd0a 100644
--- a/fe/src/main/java/org/apache/impala/planner/Planner.java
+++ b/fe/src/main/java/org/apache/impala/planner/Planner.java
@@ -186,6 +186,8 @@ public class Planner {
         Table targetTable = ctx_.getAnalysisResult().getInsertStmt().getTargetTable();
         graph.addTargetColumnLabels(targetTable);
         Preconditions.checkNotNull(targetTable);
+        // Lineage is not currently supported for Kudu tables (see IMPALA-4283)
+        if (targetTable instanceof KuduTable) return fragments;
         List<Expr> exprs = Lists.newArrayList();
         if (targetTable instanceof HBaseTable) {
           exprs.addAll(resultExprs);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
index a2b1fb9..068f426 100644
--- a/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
+++ b/fe/src/main/java/org/apache/impala/service/KuduCatalogOpExecutor.java
@@ -31,6 +31,7 @@ import org.apache.impala.catalog.TableNotFoundException;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.ImpalaRuntimeException;
 import org.apache.impala.common.Pair;
+import org.apache.impala.thrift.TColumn;
 import org.apache.impala.thrift.TCreateTableParams;
 import org.apache.impala.thrift.TDistributeParam;
 import org.apache.impala.thrift.TRangePartition;
@@ -74,7 +75,7 @@ public class KuduCatalogOpExecutor {
         throw new ImpalaRuntimeException(String.format(
             "Table '%s' already exists in Kudu.", kuduTableName));
       }
-      Schema schema = createTableSchema(msTbl, params);
+      Schema schema = createTableSchema(params);
       CreateTableOptions tableOpts = buildTableOptions(msTbl, params, schema);
       kudu.createTable(kuduTableName, schema, tableOpts);
     } catch (Exception e) {
@@ -86,22 +87,31 @@ public class KuduCatalogOpExecutor {
   /**
    * Creates the schema of a new Kudu table.
    */
-  private static Schema createTableSchema(
-      org.apache.hadoop.hive.metastore.api.Table msTbl, TCreateTableParams params)
+  private static Schema createTableSchema(TCreateTableParams params)
       throws ImpalaRuntimeException {
     Set<String> keyColNames = new HashSet<>(params.getPrimary_key_column_names());
-    List<FieldSchema> fieldSchemas = msTbl.getSd().getCols();
-    List<ColumnSchema> colSchemas = new ArrayList<>(fieldSchemas.size());
-    for (FieldSchema fieldSchema: fieldSchemas) {
-      Type type = Type.parseColumnType(fieldSchema.getType());
+    List<ColumnSchema> colSchemas = new ArrayList<>(params.getColumnsSize());
+    for (TColumn column: params.getColumns()) {
+      Type type = Type.fromThrift(column.getColumnType());
       Preconditions.checkState(type != null);
       org.apache.kudu.Type kuduType = KuduUtil.fromImpalaType(type);
       // Create the actual column and check if the column is a key column
       ColumnSchemaBuilder csb =
-          new ColumnSchemaBuilder(fieldSchema.getName(), kuduType);
-      boolean isKeyCol = keyColNames.contains(fieldSchema.getName());
-      csb.key(isKeyCol);
-      csb.nullable(!isKeyCol);
+          new ColumnSchemaBuilder(column.getColumnName(), kuduType);
+      Preconditions.checkState(column.isSetIs_key());
+      csb.key(keyColNames.contains(column.getColumnName()));
+      if (column.isSetIs_nullable()) csb.nullable(column.isIs_nullable());
+      if (column.isSetDefault_value()) {
+        csb.defaultValue(KuduUtil.getKuduDefaultValue(column.getDefault_value(), kuduType,
+            column.getColumnName()));
+      }
+      if (column.isSetBlock_size()) csb.desiredBlockSize(column.getBlock_size());
+      if (column.isSetEncoding()) {
+        csb.encoding(KuduUtil.fromThrift(column.getEncoding()));
+      }
+      if (column.isSetCompression()) {
+        csb.compressionAlgorithm(KuduUtil.fromThrift(column.getCompression()));
+      }
       colSchemas.add(csb.build());
     }
     return new Schema(colSchemas);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java
index 36a586b..9840766 100644
--- a/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java
+++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaParser.java
@@ -29,10 +29,12 @@ import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
-import org.codehaus.jackson.JsonNode;
-
 import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.analysis.TypeDef;
 import org.apache.impala.catalog.ArrayType;
@@ -42,8 +44,7 @@ import org.apache.impala.catalog.StructField;
 import org.apache.impala.catalog.StructType;
 import org.apache.impala.catalog.Type;
 import org.apache.impala.common.AnalysisException;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import org.codehaus.jackson.JsonNode;
 
 /**
  * Utility class used to parse Avro schema. Checks that the schema is valid
@@ -81,9 +82,12 @@ public class AvroSchemaParser {
     }
     List<ColumnDef> colDefs = Lists.newArrayListWithCapacity(schema.getFields().size());
     for (Schema.Field field: schema.getFields()) {
+      Map<ColumnDef.Option, Object> option = Maps.newHashMap();
+      String comment = field.doc();
+      if (comment != null) option.put(ColumnDef.Option.COMMENT, comment);
       ColumnDef colDef = new ColumnDef(field.name(),
-          new TypeDef(getTypeInfo(field.schema(), field.name())), field.doc());
-      colDef.analyze();
+          new TypeDef(getTypeInfo(field.schema(), field.name())), option);
+      colDef.analyze(null);
       colDefs.add(colDef);
     }
     return colDefs;
@@ -201,4 +205,4 @@ public class AvroSchemaParser {
     }
     return propValue;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
index f5b3bb4..1da466e 100644
--- a/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AvroSchemaUtils.java
@@ -23,19 +23,19 @@ import java.net.URL;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.io.IOUtils;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-
 import org.apache.impala.analysis.ColumnDef;
 import org.apache.impala.catalog.PrimitiveType;
 import org.apache.impala.common.AnalysisException;
 import org.apache.impala.common.FileSystemUtil;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-
 /**
  * Contains utility functions for dealing with Avro schemas.
  */
@@ -139,10 +139,13 @@ public class AvroSchemaUtils {
       if ((colDef.getType().isStringType() && avroCol.getType().isStringType())) {
         Preconditions.checkState(
             avroCol.getType().getPrimitiveType() == PrimitiveType.STRING);
+        Map<ColumnDef.Option, Object> option = Maps.newHashMap();
+        String comment = avroCol.getComment();
+        if (comment != null) option.put(ColumnDef.Option.COMMENT, comment);
         ColumnDef reconciledColDef = new ColumnDef(
-            avroCol.getColName(), colDef.getTypeDef(), avroCol.getComment());
+            avroCol.getColName(), colDef.getTypeDef(), option);
         try {
-          reconciledColDef.analyze();
+          reconciledColDef.analyze(null);
         } catch (AnalysisException e) {
           Preconditions.checkNotNull(
               null, "reconciledColDef.analyze() should never throw.");

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/3db5ced4/fe/src/main/java/org/apache/impala/util/KuduUtil.java
----------------------------------------------------------------------
diff --git a/fe/src/main/java/org/apache/impala/util/KuduUtil.java b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
index 65fae74..dd09a28 100644
--- a/fe/src/main/java/org/apache/impala/util/KuduUtil.java
+++ b/fe/src/main/java/org/apache/impala/util/KuduUtil.java
@@ -29,18 +29,26 @@ import org.apache.impala.common.Pair;
 import org.apache.impala.service.BackendConfig;
 import org.apache.impala.thrift.TExpr;
 import org.apache.impala.thrift.TExprNode;
+import org.apache.impala.analysis.LiteralExpr;
+import org.apache.impala.analysis.Expr;
+import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TColumnEncoding;
+import org.apache.impala.thrift.THdfsCompression;
+
+import com.google.common.base.Splitter;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnSchema.Encoding;
+import org.apache.kudu.ColumnSchema.CompressionAlgorithm;
 import org.apache.kudu.Schema;
 import org.apache.kudu.client.KuduClient;
 import org.apache.kudu.client.KuduClient.KuduClientBuilder;
 import org.apache.kudu.client.PartialRow;
 import org.apache.kudu.client.RangePartitionBound;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 public class KuduUtil {
 
   private static final String KUDU_TABLE_NAME_PREFIX = "impala::";
@@ -138,6 +146,145 @@ public class KuduUtil {
     }
   }
 
+  public static Object getKuduDefaultValue(TExpr defaultValue,
+      org.apache.kudu.Type type, String colName) throws ImpalaRuntimeException {
+    Preconditions.checkState(defaultValue.getNodes().size() == 1);
+    TExprNode literal = defaultValue.getNodes().get(0);
+    switch (type) {
+      case INT8:
+        checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
+        return (byte) literal.getInt_literal().getValue();
+      case INT16:
+        checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
+        return (short) literal.getInt_literal().getValue();
+      case INT32:
+        checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
+        return (int) literal.getInt_literal().getValue();
+      case INT64:
+        checkCorrectType(literal.isSetInt_literal(), type, colName, literal);
+        return (long) literal.getInt_literal().getValue();
+      case FLOAT:
+        checkCorrectType(literal.isSetFloat_literal(), type, colName, literal);
+        return (float) literal.getFloat_literal().getValue();
+      case DOUBLE:
+        checkCorrectType(literal.isSetFloat_literal(), type, colName, literal);
+        return (double) literal.getFloat_literal().getValue();
+      case STRING:
+        checkCorrectType(literal.isSetString_literal(), type, colName, literal);
+        return literal.getString_literal().getValue();
+      default:
+        throw new ImpalaRuntimeException("Unsupported value for column type: " +
+            type.toString());
+    }
+  }
+
+  public static Encoding fromThrift(TColumnEncoding encoding)
+      throws ImpalaRuntimeException {
+    switch (encoding) {
+      case AUTO:
+        return Encoding.AUTO_ENCODING;
+      case PLAIN:
+        return Encoding.PLAIN_ENCODING;
+      case PREFIX:
+        return Encoding.PREFIX_ENCODING;
+      case GROUP_VARINT:
+        return Encoding.GROUP_VARINT;
+      case RLE:
+        return Encoding.RLE;
+      case DICTIONARY:
+        return Encoding.DICT_ENCODING;
+      case BIT_SHUFFLE:
+        return Encoding.BIT_SHUFFLE;
+      default:
+        throw new ImpalaRuntimeException("Unsupported encoding: " +
+            encoding.toString());
+    }
+  }
+
+  public static TColumnEncoding toThrift(Encoding encoding)
+      throws ImpalaRuntimeException {
+    switch (encoding) {
+      case AUTO_ENCODING:
+        return TColumnEncoding.AUTO;
+      case PLAIN_ENCODING:
+        return TColumnEncoding.PLAIN;
+      case PREFIX_ENCODING:
+        return TColumnEncoding.PREFIX;
+      case GROUP_VARINT:
+        return TColumnEncoding.GROUP_VARINT;
+      case RLE:
+        return TColumnEncoding.RLE;
+      case DICT_ENCODING:
+        return TColumnEncoding.DICTIONARY;
+      case BIT_SHUFFLE:
+        return TColumnEncoding.BIT_SHUFFLE;
+      default:
+        throw new ImpalaRuntimeException("Unsupported encoding: " +
+            encoding.toString());
+    }
+  }
+
+  public static CompressionAlgorithm fromThrift(THdfsCompression compression)
+      throws ImpalaRuntimeException {
+    switch (compression) {
+      case DEFAULT:
+        return CompressionAlgorithm.DEFAULT_COMPRESSION;
+      case NONE:
+        return CompressionAlgorithm.NO_COMPRESSION;
+      case SNAPPY:
+        return CompressionAlgorithm.SNAPPY;
+      case LZ4:
+        return CompressionAlgorithm.LZ4;
+      case ZLIB:
+        return CompressionAlgorithm.ZLIB;
+      default:
+        throw new ImpalaRuntimeException("Unsupported compression algorithm: " +
+            compression.toString());
+    }
+  }
+
+  public static THdfsCompression toThrift(CompressionAlgorithm compression)
+      throws ImpalaRuntimeException {
+    switch (compression) {
+      case NO_COMPRESSION:
+        return THdfsCompression.NONE;
+      case DEFAULT_COMPRESSION:
+        return THdfsCompression.DEFAULT;
+      case SNAPPY:
+        return THdfsCompression.SNAPPY;
+      case LZ4:
+        return THdfsCompression.LZ4;
+      case ZLIB:
+        return THdfsCompression.ZLIB;
+      default:
+        throw new ImpalaRuntimeException("Unsupported compression algorithm: " +
+            compression.toString());
+    }
+  }
+
+  public static TColumn setColumnOptions(TColumn column, boolean isKey,
+      Boolean isNullable, Encoding encoding, CompressionAlgorithm compression,
+      Expr defaultValue, Integer blockSize) {
+    column.setIs_key(isKey);
+    if (isNullable != null) column.setIs_nullable(isNullable);
+    try {
+      if (encoding != null) column.setEncoding(toThrift(encoding));
+      if (compression != null) column.setCompression(toThrift(compression));
+    } catch (ImpalaRuntimeException e) {
+      // This shouldn't happen
+      throw new IllegalStateException(String.format("Error parsing " +
+          "encoding/compression values for Kudu column '%s': %s", column.getColumnName(),
+          e.getMessage()));
+    }
+
+    if (defaultValue != null) {
+      Preconditions.checkState(defaultValue instanceof LiteralExpr);
+      column.setDefault_value(defaultValue.treeToThrift());
+    }
+    if (blockSize != null) column.setBlock_size(blockSize);
+    return column;
+  }
+
   /**
    * If correctType is true, returns. Otherwise throws a formatted error message
    * indicating problems with the type of the literal of the range literal.


Mime
View raw message