impala-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tarmstr...@apache.org
Subject [04/14] incubator-impala git commit: IMPALA-3719: Simplify CREATE TABLE statements with Kudu tables
Date Sat, 22 Oct 2016 05:33:31 GMT
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/common/kudu_test_suite.py
----------------------------------------------------------------------
diff --git a/tests/common/kudu_test_suite.py b/tests/common/kudu_test_suite.py
new file mode 100644
index 0000000..7a93c12
--- /dev/null
+++ b/tests/common/kudu_test_suite.py
@@ -0,0 +1,148 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import os
+import string
+import pytest
+from contextlib import contextmanager
+from kudu.schema import (
+    BOOL,
+    DOUBLE,
+    FLOAT,
+    INT16,
+    INT32,
+    INT64,
+    INT8,
+    SchemaBuilder,
+    STRING)
+from kudu.client import Partitioning
+from random import choice, sample
+from string import ascii_lowercase, digits
+
+from tests.common.impala_test_suite import ImpalaTestSuite
+from tests.common.skip import SkipIf
+from tests.common.test_dimensions import create_uncompressed_text_dimension
+
+class KuduTestSuite(ImpalaTestSuite):
+
+  # Lazily set.
+  __DB_NAME = None
+
+  @classmethod
+  def setup_class(cls):
+    if os.environ["KUDU_IS_SUPPORTED"] == "false":
+      pytest.skip("Kudu is not supported")
+
+    super(KuduTestSuite, cls).setup_class()
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @classmethod
+  def add_test_dimensions(cls):
+    super(KuduTestSuite, cls).add_test_dimensions()
+    cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
+
+  @classmethod
+  def auto_create_db(cls):
+    return True
+
+  @classmethod
+  def get_db_name(cls):
+    # When py.test runs with the xdist plugin, several processes are started and each
+    # process runs some partition of the tests. It's possible that multiple processes
+    # will call this method. A random value is generated so the processes won't try
+    # to use the same database at the same time. The value is cached so within a single
+    # process the same database name is always used for the class. This doesn't need to
+    # be thread-safe since multi-threading is never used.
+    if not cls.__DB_NAME:
+      cls.__DB_NAME = \
+          choice(ascii_lowercase) + "".join(sample(ascii_lowercase + digits, 5))
+    return cls.__DB_NAME
+
+  @classmethod
+  def random_table_name(cls):
+    return "".join(choice(string.lowercase) for _ in xrange(10))
+
+  @classmethod
+  def get_kudu_table_base_name(cls, name):
+    return name.split(".")[-1]
+
+  @contextmanager
+  def temp_kudu_table(self, kudu, col_types, name=None, num_key_cols=1, col_names=None,
+      prepend_db_name=True, db_name=None):
+    """Create and return a table. This function should be used in a "with" context.
+       'kudu' must be a kudu.client.Client. If a table name is not provided, a random
+       name will be used. If 'prepend_db_name' is True, the table name will be prepended
+       with (get_db_name() + "."). If column names are not provided, the letters
+       "a", "b", "c", ... will be used.
+
+       Example:
+         with self.temp_kudu_table(kudu, [INT32]) as kudu_table:
+            assert kudu.table_exists(kudu_table.name)
+         assert not kudu.table_exists(kudu_table.name)
+    """
+    if not col_names:
+      if len(col_types) > 26:
+        raise Exception("Too many columns for default naming")
+      col_names = [chr(97 + i) for i in xrange(len(col_types))]
+    schema_builder = SchemaBuilder()
+    for i, t in enumerate(col_types):
+      column_spec = schema_builder.add_column(col_names[i], type_=t)
+      if i < num_key_cols:
+        column_spec.nullable(False)
+    schema_builder.set_primary_keys(col_names[:num_key_cols])
+    schema = schema_builder.build()
+    name = name or self.random_table_name()
+    if prepend_db_name:
+      name = (db_name or self.get_db_name().lower()) + "." + name
+    kudu.create_table(name, schema,
+        partitioning=Partitioning().add_hash_partitions(col_names[:num_key_cols], 2))
+    try:
+      yield kudu.table(name)
+    finally:
+      if kudu.table_exists(name):
+        kudu.delete_table(name)
+
+  @contextmanager
+  def drop_impala_table_after_context(self, cursor, table_name):
+    """For use in a "with" block: The named table will be dropped using the provided
+       cursor when the block exits.
+
+       cursor.execute("CREATE TABLE foo ...")
+       with drop_impala_table_after_context(cursor, "foo"):
+         ...
+       # Now table foo no longer exists.
+    """
+    try:
+      yield
+    finally:
+      cursor.execute("DROP TABLE %s" % table_name)
+
+  def kudu_col_type_to_impala_col_type(self, col_type):
+    mapping = {BOOL: "BOOLEAN",
+        DOUBLE: "DOUBLE",
+        FLOAT: "FLOAT",
+        INT16: "SMALLINT",
+        INT32: "INT",
+        INT64: "BIGINT",
+        INT8: "TINYINT",
+        STRING: "STRING"}
+    if col_type not in mapping:
+      raise Exception("Unexpected column type: %s" % col_type)
+    return mapping[col_type]

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/conftest.py
----------------------------------------------------------------------
diff --git a/tests/conftest.py b/tests/conftest.py
index 3193c9e..c22de39 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -27,7 +27,7 @@ import logging
 import os
 import pytest
 
-from common import KUDU_MASTER_HOST, KUDU_MASTER_PORT
+from common import KUDU_MASTER_HOSTS
 from common.test_result_verifier import QueryTestResult
 from tests.common.patterns import is_valid_impala_identifier
 from tests.util.filesystem_utils import FILESYSTEM, ISILON_WEBHDFS_PORT
@@ -288,7 +288,13 @@ def kudu_client():
   """Provides a new Kudu client as a pytest fixture. The client only exists for the
      duration of the method it is used in.
   """
-  kudu_client = kudu_connect(KUDU_MASTER_HOST, KUDU_MASTER_PORT)
+  if "," in KUDU_MASTER_HOSTS:
+    raise Exception("Multi-master not supported yet")
+  if ":" in KUDU_MASTER_HOSTS:
+    host, port = KUDU_MASTER_HOSTS.split(":")
+  else:
+    host, port = KUDU_MASTER_HOSTS, 7051
+  kudu_client = kudu_connect(host, port)
   try:
     yield kudu_client
   finally:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/custom_cluster/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/custom_cluster/test_kudu.py b/tests/custom_cluster/test_kudu.py
new file mode 100644
index 0000000..898a29e
--- /dev/null
+++ b/tests/custom_cluster/test_kudu.py
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import logging
+import pytest
+from kudu.schema import INT32
+
+from tests.common import KUDU_MASTER_HOSTS
+from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
+from tests.common.kudu_test_suite import KuduTestSuite
+
+LOG = logging.getLogger(__name__)
+
+class TestKuduOperations(CustomClusterTestSuite, KuduTestSuite):
+
+  @classmethod
+  def get_workload(cls):
+    return 'functional-query'
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(impalad_args="-kudu_master_hosts=")
+  def test_kudu_master_hosts(self, cursor, kudu_client):
+    """Check behavior when -kudu_master_hosts is not provided to catalogd."""
+    with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
+      table_name = self.get_kudu_table_base_name(kudu_table.name)
+      props = "TBLPROPERTIES('kudu.table_name'='%s')" % (kudu_table.name)
+      try:
+        cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (table_name,
+            props))
+        assert False
+      except Exception as e:
+        assert "Table property 'kudu.master_addresses' is required" in str(e)
+
+      cursor.execute("""
+          CREATE EXTERNAL TABLE %s STORED AS KUDU
+          TBLPROPERTIES ('kudu.master_addresses' = '%s',
+          'kudu.table_name'='%s')
+          """ % (table_name, KUDU_MASTER_HOSTS, kudu_table.name))
+      cursor.execute("DROP TABLE %s" % table_name)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/metadata/test_ddl.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_ddl.py b/tests/metadata/test_ddl.py
index 18ed1af..8079855 100644
--- a/tests/metadata/test_ddl.py
+++ b/tests/metadata/test_ddl.py
@@ -220,9 +220,8 @@ class TestDdlStatements(TestDdlBase):
   @SkipIf.kudu_not_supported
   @UniqueDatabase.parametrize(sync_ddl=True)
   def test_create_kudu(self, vector, unique_database):
-    self.expected_exceptions = 2
     vector.get_value('exec_option')['abort_on_error'] = False
-    self.run_test_case('QueryTest/create_kudu', vector, use_db=unique_database,
+    self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database,
         multiple_impalad=self._use_multiple_impalad(vector))
 
   @UniqueDatabase.parametrize(sync_ddl=True)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/metadata/test_show_create_table.py
----------------------------------------------------------------------
diff --git a/tests/metadata/test_show_create_table.py b/tests/metadata/test_show_create_table.py
index 717223a..aae9f0c 100644
--- a/tests/metadata/test_show_create_table.py
+++ b/tests/metadata/test_show_create_table.py
@@ -54,11 +54,6 @@ class TestShowCreateTable(ImpalaTestSuite):
     self.__run_show_create_table_test_case('QueryTest/show-create-table', vector,
                                            unique_database)
 
-  @SkipIf.kudu_not_supported
-  def test_kudu_show_create_table(self, vector, unique_database):
-    self.__run_show_create_table_test_case('QueryTest/kudu-show-create', vector,
-                                           unique_database)
-
   def __run_show_create_table_test_case(self, test_file_name, vector, unique_db_name):
     """
     Runs a show-create-table test file, containing the following sections:

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/041fa6d9/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index d791608..c22de3e 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -15,104 +15,427 @@
 # specific language governing permissions and limitations
 # under the License.
 
+from kudu.schema import (
+    BOOL,
+    DOUBLE,
+    FLOAT,
+    INT16,
+    INT32,
+    INT64,
+    INT8,
+    STRING)
+import logging
 import pytest
-from copy import copy
+import textwrap
 
-from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
-from tests.common.impala_test_suite import ImpalaTestSuite
-from tests.common.skip import SkipIf
-from tests.common.test_dimensions import create_uncompressed_text_dimension
-from tests.common.test_vector import TestDimension
+from tests.common import KUDU_MASTER_HOSTS
+from tests.common.kudu_test_suite import KuduTestSuite
 
+LOG = logging.getLogger(__name__)
 
-@SkipIf.kudu_not_supported
-class TestKuduOperations(ImpalaTestSuite):
+class TestKuduOperations(KuduTestSuite):
   """
   This suite tests the different modification operations when using a kudu table.
   """
 
-  @classmethod
-  def file_format_constraint(cls, v):
-    return v.get_value('table_format').file_format in ["parquet"]
+  def test_kudu_scan_node(self, vector, unique_database):
+    self.run_test_case('QueryTest/kudu-scan-node', vector, use_db=unique_database,
+        wait_secs_between_stmts=1)
 
-  @classmethod
-  def get_workload(cls):
-    return 'functional-query'
+  def test_kudu_crud(self, vector, unique_database):
+    self.run_test_case('QueryTest/kudu_crud', vector, use_db=unique_database,
+        wait_secs_between_stmts=1)
 
-  @classmethod
-  def add_test_dimensions(cls):
-    super(TestKuduOperations, cls).add_test_dimensions()
-    cls.TestMatrix.add_constraint(cls.file_format_constraint)
-
-  # TODO(kudu-merge) IMPALA-3178 DROP DATABASE ... CASCADE is broken in Kudu so we need
-  # to clean up table-by-table. Once solved, delete this and rely on the overriden method.
-  def cleanup_db(self, db_name):
-    self.client.execute("use default")
-    self.client.set_configuration({'sync_ddl': True})
-    if db_name + "\t" in self.client.execute("show databases", ).data:
-      # We use quoted identifiers to avoid name clashes with keywords
-      for tbl_name in self.client.execute("show tables in `" + db_name + "`").data:
-        full_tbl_name = '`%s`.`%s`' % (db_name, tbl_name)
-        result = self.client.execute("describe formatted " + full_tbl_name)
-        if 'VIRTUAL_VIEW' in '\n'.join(result.data):
-          self.client.execute("drop view " + full_tbl_name)
-        else:
-          self.client.execute("drop table " + full_tbl_name)
-      for fn_result in self.client.execute("show functions in `" + db_name + "`").data:
-        # First column is the return type, second is the function signature
-        fn_name = fn_result.split('\t')[1]
-        self.client.execute("drop function `%s`.%s" % (db_name, fn_name))
-      for fn_result in self.client.execute(\
-        "show aggregate functions in `" + db_name + "`").data:
-        fn_name = fn_result.split('\t')[1]
-        self.client.execute("drop function `%s`.%s" % (db_name, fn_name))
-      self.client.execute("drop database `" + db_name + "`")
-
-  def setup_method(self, method):
-    self.cleanup_db("kududb_test")
-    self.client.execute("create database kududb_test")
-
-  def teardown_method(self, method):
-    self.cleanup_db("kududb_test")
+  def test_kudu_partition_ddl(self, vector, unique_database):
+    self.run_test_case('QueryTest/kudu_partition_ddl', vector, use_db=unique_database)
 
-  @pytest.mark.execute_serially
-  def test_kudu_scan_node(self, vector):
-    self.run_test_case('QueryTest/kudu-scan-node', vector, use_db="functional_kudu",
-        wait_secs_between_stmts=1)
+  def test_kudu_alter_table(self, vector, unique_database):
+    self.run_test_case('QueryTest/kudu_alter', vector, use_db=unique_database)
 
-  @pytest.mark.execute_serially
-  def test_insert_update_delete(self, vector):
-    self.run_test_case('QueryTest/kudu_crud', vector, use_db="kududb_test",
-        wait_secs_between_stmts=1)
+  def test_kudu_stats(self, vector, unique_database):
+    self.run_test_case('QueryTest/kudu_stats', vector, use_db=unique_database)
 
-  @pytest.mark.execute_serially
-  def test_kudu_partition_ddl(self, vector):
-    self.run_test_case('QueryTest/kudu_partition_ddl', vector, use_db="kududb_test")
 
-  @pytest.mark.execute_serially
-  def test_kudu_alter_table(self, vector):
-    self.run_test_case('QueryTest/kudu_alter', vector, use_db="kududb_test")
+class TestCreateExternalTable(KuduTestSuite):
 
-  @pytest.mark.execute_serially
-  def test_kudu_stats(self, vector):
-    self.run_test_case('QueryTest/kudu_stats', vector, use_db="kududb_test")
+  def test_implicit_table_props(self, cursor, kudu_client):
+    """Check that table properties added internally during table creation are as
+       expected.
+    """
+    with self.temp_kudu_table(kudu_client, [STRING, INT8, BOOL], num_key_cols=2) \
+        as kudu_table:
+      impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
+      props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
+      cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name,
+          props))
+      with self.drop_impala_table_after_context(cursor, impala_table_name):
+        cursor.execute("DESCRIBE FORMATTED %s" % impala_table_name)
+        table_desc = [[col.strip() if col else col for col in row] for row in cursor]
+        LOG.info(table_desc)
+        # Pytest shows truncated output on failure, so print the details just in case.
+        assert ["", "EXTERNAL", "TRUE"] in table_desc
+        assert ["", "kudu.master_addresses", KUDU_MASTER_HOSTS] in table_desc
+        assert ["", "kudu.table_name", kudu_table.name] in table_desc
+        assert ["", "storage_handler", "com.cloudera.kudu.hive.KuduStorageHandler"] \
+            in table_desc
+
+  def test_col_types(self, cursor, kudu_client):
+    """Check that a table can be created using all available column types."""
+    # TODO: The python Kudu client doesn't yet support TIMESTAMP or BYTE[], those should
+    #       be tested for graceful failure.
+    kudu_types = [STRING, BOOL, DOUBLE, FLOAT, INT16, INT32, INT64, INT8]
+    with self.temp_kudu_table(kudu_client, kudu_types) as kudu_table:
+      impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
+      props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
+      cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name,
+          props))
+      with self.drop_impala_table_after_context(cursor, impala_table_name):
+        cursor.execute("DESCRIBE %s" % impala_table_name)
+        kudu_schema = kudu_table.schema
+        for i, (col_name, col_type, _) in enumerate(cursor):
+          kudu_col = kudu_schema[i]
+          assert col_name == kudu_col.name
+          assert col_type.upper() == \
+              self.kudu_col_type_to_impala_col_type(kudu_col.type.type)
+
+  def test_drop_external_table(self, cursor, kudu_client):
+    """Check that dropping an external table only affects the catalog and does not delete
+       the table in Kudu.
+    """
+    with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
+      impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
+      props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
+      cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (impala_table_name,
+          props))
+      with self.drop_impala_table_after_context(cursor, impala_table_name):
+        cursor.execute("SELECT COUNT(*) FROM %s" % impala_table_name)
+        assert cursor.fetchall() == [(0, )]
+      try:
+        cursor.execute("SELECT COUNT(*) FROM %s" % impala_table_name)
+        assert False
+      except Exception as e:
+        assert "Could not resolve table reference" in str(e)
+      assert kudu_client.table_exists(kudu_table.name)
+
+  def test_explicit_name(self, cursor, kudu_client):
+    """Check that a Kudu table can be specified using a table property."""
+    with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
+      table_name = self.random_table_name()
+      cursor.execute("""
+          CREATE EXTERNAL TABLE %s
+          STORED AS KUDU
+          TBLPROPERTIES('kudu.table_name' = '%s')""" % (table_name, kudu_table.name))
+      with self.drop_impala_table_after_context(cursor, table_name):
+        cursor.execute("SELECT * FROM %s" % table_name)
+        assert len(cursor.fetchall()) == 0
+
+  def test_explicit_name_preference(self, cursor, kudu_client):
+    """Check that the table name from a table property is used when a table of the
+       implied name also exists.
+    """
+    with self.temp_kudu_table(kudu_client, [INT64]) as preferred_kudu_table:
+      with self.temp_kudu_table(kudu_client, [INT8]) as other_kudu_table:
+        impala_table_name = self.get_kudu_table_base_name(other_kudu_table.name)
+        cursor.execute("""
+            CREATE EXTERNAL TABLE %s
+            STORED AS KUDU
+            TBLPROPERTIES('kudu.table_name' = '%s')""" % (
+                impala_table_name, preferred_kudu_table.name))
+        with self.drop_impala_table_after_context(cursor, impala_table_name):
+          cursor.execute("DESCRIBE %s" % impala_table_name)
+          assert cursor.fetchall() == [("a", "bigint", "")]
+
+  def test_explicit_name_doesnt_exist(self, cursor, kudu_client):
+    kudu_table_name = self.random_table_name()
+    try:
+      cursor.execute("""
+          CREATE EXTERNAL TABLE %s
+          STORED AS KUDU
+          TBLPROPERTIES('kudu.table_name' = '%s')""" % (
+              self.random_table_name(), kudu_table_name))
+    except Exception as e:
+      assert "Table does not exist in Kudu: '%s'" % kudu_table_name in str(e)
+
+  def test_explicit_name_doesnt_exist_but_implicit_does(self, cursor, kudu_client):
+    """Check that when an explicit table name is given but that table doesn't exist,
+       there is no fall-through to an existing implicit table.
+    """
+    with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
+      table_name = self.random_table_name()
+      try:
+        cursor.execute("""
+            CREATE EXTERNAL TABLE %s
+            STORED AS KUDU
+            TBLPROPERTIES('kudu.table_name' = '%s')""" % (
+              self.get_kudu_table_base_name(kudu_table.name), table_name))
+      except Exception as e:
+        assert "Table does not exist in Kudu: '%s'" % table_name in str(e)
+
+
+class TestShowCreateTable(KuduTestSuite):
+
+  def assert_show_create_equals(self, cursor, create_sql, show_create_sql):
+    """Executes 'create_sql' to create a table, then runs "SHOW CREATE TABLE" and checks
+       that the output is the same as 'show_create_sql'. 'create_sql' and
+       'show_create_sql' can be templates that can be used with str.format(). format()
+       will be called with 'table' and 'db' as keyword args.
+    """
+    format_args = {"table": self.random_table_name(), "db": cursor.conn.db_name}
+    cursor.execute(create_sql.format(**format_args))
+    cursor.execute("SHOW CREATE TABLE {table}".format(**format_args))
+    assert cursor.fetchall()[0][0] == \
+        textwrap.dedent(show_create_sql.format(**format_args)).strip()
+
+  def test_primary_key_and_distribution(self, cursor):
+    # TODO: Add test cases with column comments once KUDU-1711 is fixed.
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE TABLE {table} (c INT PRIMARY KEY)
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS STORED AS KUDU""",
+        """
+        CREATE TABLE {db}.{{table}} (
+          c INT,
+          PRIMARY KEY (c)
+        )
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS
+        STORED AS KUDU
+        TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
+            db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE TABLE {table} (c INT PRIMARY KEY, d STRING)
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) SPLIT ROWS ((1), (2))
+        STORED AS KUDU""",
+        """
+        CREATE TABLE {db}.{{table}} (
+          c INT,
+          d STRING,
+          PRIMARY KEY (c)
+        )
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, RANGE (c) SPLIT ROWS (...)
+        STORED AS KUDU
+        TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
+            db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE TABLE {table} (c INT, PRIMARY KEY (c))
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS STORED AS KUDU""",
+        """
+        CREATE TABLE {db}.{{table}} (
+          c INT,
+          PRIMARY KEY (c)
+        )
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS
+        STORED AS KUDU
+        TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
+            db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE TABLE {table} (c INT, d STRING, PRIMARY KEY(c, d))
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS,
+        RANGE (c, d) SPLIT ROWS ((1, 'aaa'), (2, 'bbb')) STORED AS KUDU""",
+        """
+        CREATE TABLE {db}.{{table}} (
+          c INT,
+          d STRING,
+          PRIMARY KEY (c, d)
+        )
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS, HASH (d) INTO 3 BUCKETS, RANGE (c, d) SPLIT
ROWS (...)
+        STORED AS KUDU
+        TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
+            db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE TABLE {table} (c INT, d STRING, e INT, PRIMARY KEY(c, d))
+        DISTRIBUTE BY RANGE (c) SPLIT ROWS ((1), (2), (3)) STORED AS KUDU""",
+        """
+        CREATE TABLE {db}.{{table}} (
+          c INT,
+          d STRING,
+          e INT,
+          PRIMARY KEY (c, d)
+        )
+        DISTRIBUTE BY RANGE (c) SPLIT ROWS (...)
+        STORED AS KUDU
+        TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
+            db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
+
+  def test_properties(self, cursor):
+    # If an explicit table name is used for the Kudu table and it differs from what
+    # would be the default Kudu table name, the name should be shown as a table property.
+    kudu_table = self.random_table_name()
+    props = "'kudu.table_name'='%s'" % kudu_table
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE TABLE {{table}} (c INT PRIMARY KEY)
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS
+        STORED AS KUDU
+        TBLPROPERTIES ({props})""".format(props=props),
+        """
+        CREATE TABLE {db}.{{table}} (
+          c INT,
+          PRIMARY KEY (c)
+        )
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS
+        STORED AS KUDU
+        TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}', {props})""".format(
+            db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS, props=props))
 
+    # If the name is explicitly given (or not given at all) so that the name is the same
+    # as the default name, the table name is not shown.
+    props = "'kudu.table_name'='impala::{db}.{table}'"
+    self.assert_show_create_equals(cursor,
+        """
+        CREATE TABLE {{table}} (c INT PRIMARY KEY)
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS
+        STORED AS KUDU
+        TBLPROPERTIES ({props})""".format(props=props),
+        """
+        CREATE TABLE {db}.{{table}} (
+          c INT,
+          PRIMARY KEY (c)
+        )
+        DISTRIBUTE BY HASH (c) INTO 3 BUCKETS
+        STORED AS KUDU
+        TBLPROPERTIES ('kudu.master_addresses'='{kudu_addr}')""".format(
+            db=cursor.conn.db_name, kudu_addr=KUDU_MASTER_HOSTS))
 
-@SkipIf.kudu_not_supported
-class TestKuduMemLimits(ImpalaTestSuite):
-  QUERIES = ["select * from kudu_mem_limit.lineitem where l_orderkey = -1",
-             "select * from kudu_mem_limit.lineitem where l_commitdate like '%cheese'",
-             "select * from kudu_mem_limit.lineitem limit 90"]
+
+class TestDropDb(KuduTestSuite):
+
+  def test_drop_non_empty_db(self, unique_cursor, kudu_client):
+    """Check that an attempt to drop a database will fail if Kudu tables are present
+       and that the tables remain.
+    """
+    db_name = unique_cursor.conn.db_name
+    with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table:
+      impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
+      props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
+      unique_cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
+          impala_table_name, props))
+      unique_cursor.execute("USE DEFAULT")
+      try:
+        unique_cursor.execute("DROP DATABASE %s" % db_name)
+        assert False
+      except Exception as e:
+        assert "One or more tables exist" in str(e)
+      unique_cursor.execute("SELECT COUNT(*) FROM %s.%s" % (db_name, impala_table_name))
+      assert unique_cursor.fetchall() == [(0, )]
+
+  def test_drop_db_cascade(self, unique_cursor, kudu_client):
+    """Check that an attempt to drop a database will succeed even if Kudu tables are
+       present and that the managed tables are removed.
+    """
+    db_name = unique_cursor.conn.db_name
+    with self.temp_kudu_table(kudu_client, [INT32], db_name=db_name) as kudu_table:
+      # Create an external Kudu table
+      impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
+      props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
+      unique_cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
+          impala_table_name, props))
+
+      # Create a managed Kudu table
+      managed_table_name = self.random_table_name()
+      unique_cursor.execute("""
+          CREATE TABLE %s (a INT PRIMARY KEY) DISTRIBUTE BY HASH (a) INTO 3 BUCKETS
+          STORED AS KUDU TBLPROPERTIES ('kudu.table_name' = '%s')"""
+          % (managed_table_name, managed_table_name))
+      assert kudu_client.table_exists(managed_table_name)
+
+      # Create a table in HDFS
+      hdfs_table_name = self.random_table_name()
+      unique_cursor.execute("""
+          CREATE TABLE %s (a INT) PARTITIONED BY (x INT)""" % (hdfs_table_name))
+
+      unique_cursor.execute("USE DEFAULT")
+      unique_cursor.execute("DROP DATABASE %s CASCADE" % db_name)
+      unique_cursor.execute("SHOW DATABASES")
+      assert db_name not in unique_cursor.fetchall()
+      assert kudu_client.table_exists(kudu_table.name)
+      assert not kudu_client.table_exists(managed_table_name)
+
+class TestImpalaKuduIntegration(KuduTestSuite):
+  def test_replace_kudu_table(self, cursor, kudu_client):
+    """Check that an external Kudu table is accessible if the underlying Kudu table is
+        modified using the Kudu client.
+    """
+    # Create an external Kudu table
+    col_names = ['a']
+    with self.temp_kudu_table(kudu_client, [INT32], col_names=col_names) as kudu_table:
+      impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
+      props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
+      cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
+          impala_table_name, props))
+      cursor.execute("DESCRIBE %s" % (impala_table_name))
+      assert cursor.fetchall() == [("a", "int", "")]
+
+      # Drop the underlying Kudu table and replace it with another Kudu table that has
+      # the same name but different schema
+      kudu_client.delete_table(kudu_table.name)
+      assert not kudu_client.table_exists(kudu_table.name)
+      new_col_names = ['b', 'c']
+      name_parts = kudu_table.name.split(".")
+      assert len(name_parts) == 2
+      with self.temp_kudu_table(kudu_client, [STRING, STRING], col_names=new_col_names,
+          db_name=name_parts[0], name= name_parts[1]) as new_kudu_table:
+        assert kudu_client.table_exists(new_kudu_table.name)
+        # Refresh the external table and verify that the new schema is loaded from
+        # Kudu.
+        cursor.execute("REFRESH %s" % (impala_table_name))
+        cursor.execute("DESCRIBE %s" % (impala_table_name))
+        assert cursor.fetchall() == [("b", "string", ""), ("c", "string", "")]
+
+  def test_delete_external_kudu_table(self, cursor, kudu_client):
+    """Check that Impala can recover from the case where the underlying Kudu table of
+        an external table is dropped using the Kudu client. The external table can be
+        dropped using DROP TABLE IF EXISTS statement.
+    """
+    with self.temp_kudu_table(kudu_client, [INT32]) as kudu_table:
+      # Create an external Kudu table
+      impala_table_name = self.get_kudu_table_base_name(kudu_table.name)
+      props = "TBLPROPERTIES('kudu.table_name'='%s')" % kudu_table.name
+      cursor.execute("CREATE EXTERNAL TABLE %s STORED AS KUDU %s" % (
+          impala_table_name, props))
+      cursor.execute("DESCRIBE %s" % (impala_table_name))
+      assert cursor.fetchall() == [("a", "int", "")]
+      # Drop the underlying Kudu table
+      kudu_client.delete_table(kudu_table.name)
+      assert not kudu_client.table_exists(kudu_table.name)
+      err_msg = 'The table does not exist: table_name: "%s"' % (kudu_table.name)
+      try:
+        cursor.execute("REFRESH %s" % (impala_table_name))
+      except Exception as e:
+        assert err_msg in str(e)
+      cursor.execute("DROP TABLE IF EXISTS %s" % (impala_table_name))
+      cursor.execute("SHOW TABLES")
+      assert impala_table_name not in cursor.fetchall()
+
+  def test_delete_managed_kudu_table(self, cursor, kudu_client, unique_database):
+    """Check that dropping a managed Kudu table works even if the underlying Kudu table
+        has been dropped externally."""
+    impala_tbl_name = "foo"
+    cursor.execute("""CREATE TABLE %s.%s (a INT PRIMARY KEY) DISTRIBUTE BY HASH (a)
+        INTO 3 BUCKETS STORED AS KUDU""" % (unique_database, impala_tbl_name))
+    kudu_tbl_name = "impala::%s.%s" % (unique_database, impala_tbl_name)
+    assert kudu_client.table_exists(kudu_tbl_name)
+    kudu_client.delete_table(kudu_tbl_name)
+    assert not kudu_client.table_exists(kudu_tbl_name)
+    cursor.execute("DROP TABLE IF EXISTS %s" % (impala_tbl_name))
+    cursor.execute("SHOW TABLES")
+    assert impala_tbl_name not in cursor.fetchall()
+
+class TestKuduMemLimits(KuduTestSuite):
+
+  QUERIES = ["select * from lineitem where l_orderkey = -1",
+             "select * from lineitem where l_commitdate like '%cheese'",
+             "select * from lineitem limit 90"]
 
   # The value indicates the minimum memory requirements for the queries above, the first
   # memory limit corresponds to the first query
   QUERY_MEM_LIMITS = [1, 1, 10]
 
-  # The values from this array are used as a mem_limit test dimension
-  TEST_LIMITS = [1, 10, 0]
-
   CREATE = """
-    CREATE TABLE kudu_mem_limit.lineitem (
+    CREATE TABLE lineitem (
     l_orderkey BIGINT,
     l_linenumber INT,
     l_partkey BIGINT,
@@ -128,88 +451,41 @@ class TestKuduMemLimits(ImpalaTestSuite):
     l_receiptdate STRING,
     l_shipinstruct STRING,
     l_shipmode STRING,
-    l_comment STRING
-  )
-  DISTRIBUTE BY HASH (l_orderkey) INTO 9 BUCKETS
-  TBLPROPERTIES(
-    'storage_handler' = 'com.cloudera.kudu.hive.KuduStorageHandler',
-    'kudu.table_name' = 'tpch_lineitem',
-    'kudu.master_addresses' = '127.0.0.1',
-    'kudu.key_columns' = 'l_orderkey,l_linenumber'
-  )
-  """
+    l_comment STRING,
+    PRIMARY KEY (l_orderkey, l_linenumber))
+  DISTRIBUTE BY HASH (l_orderkey, l_linenumber) INTO 3 BUCKETS
+  STORED AS KUDU"""
 
   LOAD = """
-  insert into kudu_mem_limit.lineitem
+  insert into lineitem
   select l_orderkey, l_linenumber, l_partkey, l_suppkey, cast(l_quantity as double),
   cast(l_extendedprice as double), cast(l_discount as double), cast(l_tax as double),
   l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct,
-  l_shipmode, l_comment from tpch_parquet.lineitem;
-  """
-
-  @classmethod
-  def get_workload(cls):
-    return 'functional-query'
+  l_shipmode, l_comment from tpch_parquet.lineitem"""
 
   @classmethod
-  def add_test_dimensions(cls):
-    super(TestKuduMemLimits, cls).add_test_dimensions()
+  def auto_create_db(cls):
+    return True
 
-    # add mem_limit as a test dimension.
-    new_dimension = TestDimension('mem_limit', *TestKuduMemLimits.TEST_LIMITS)
-    cls.TestMatrix.add_dimension(new_dimension)
-    cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload()))
-
-  @classmethod
-  def setup_class(cls):
-    super(TestKuduMemLimits, cls).setup_class()
-    cls.cleanup_db("kudu_mem_limit")
-    cls.client.execute("create database kudu_mem_limit")
-    cls.client.execute(cls.CREATE)
-    cls.client.execute(cls.LOAD)
-
-  @classmethod
-  def teardown_class(cls):
-    cls.cleanup_db("kudu_mem_limit")
-    super(TestKuduMemLimits, cls).teardown_class()
-
-  # TODO(kudu-merge) IMPALA-3178 DROP DATABASE ... CASCADE is broken in Kudu so we need
-  # to clean up table-by-table. Once solved, delete this and rely on the overriden method.
-  @classmethod
-  def cleanup_db(cls, db_name):
-    cls.client.execute("use default")
-    cls.client.set_configuration({'sync_ddl': True})
-    if db_name + "\t" in cls.client.execute("show databases", ).data:
-      # We use quoted identifiers to avoid name clashes with keywords
-      for tbl_name in cls.client.execute("show tables in `" + db_name + "`").data:
-        full_tbl_name = '`%s`.`%s`' % (db_name, tbl_name)
-        result = cls.client.execute("describe formatted " + full_tbl_name)
-        if 'VIRTUAL_VIEW' in '\n'.join(result.data):
-          cls.client.execute("drop view " + full_tbl_name)
-        else:
-          cls.client.execute("drop table " + full_tbl_name)
-      for fn_result in cls.client.execute("show functions in `" + db_name + "`").data:
-        # First column is the return type, second is the function signature
-        fn_name = fn_result.split('\t')[1]
-        cls.client.execute("drop function `%s`.%s" % (db_name, fn_name))
-      for fn_result in cls.client.execute(\
-        "show aggregate functions in `" + db_name + "`").data:
-        fn_name = fn_result.split('\t')[1]
-        cls.client.execute("drop function `%s`.%s" % (db_name, fn_name))
-      cls.client.execute("drop database `" + db_name + "`")
+  @pytest.fixture(scope='class')
+  def test_data(cls, cls_cursor):
+    cls_cursor.execute(cls.CREATE)
+    cls_cursor.execute(cls.LOAD)
 
   @pytest.mark.execute_serially
-  def test_low_mem_limit_low_selectivity_scan(self, vector):
+  @pytest.mark.usefixtures("test_data")
+  @pytest.mark.parametrize("mem_limit", [1, 10, 0])
+  def test_low_mem_limit_low_selectivity_scan(self, cursor, mem_limit, vector):
     """Tests that the queries specified in this test suite run under the given
     memory limits."""
-    mem_limit = copy(vector.get_value('mem_limit'))
-    exec_options = copy(vector.get_value('exec_option'))
+    exec_options = dict((k, str(v)) for k, v
+                        in vector.get_value('exec_option').iteritems())
     exec_options['mem_limit'] = "{0}m".format(mem_limit)
     for i, q in enumerate(self.QUERIES):
       try:
-        self.execute_query(q, exec_options)
-        pass
-      except ImpalaBeeswaxException as e:
+        cursor.execute(q, configuration=exec_options)
+        cursor.fetchall()
+      except Exception as e:
         if (mem_limit > self.QUERY_MEM_LIMITS[i]):
           raise
         assert "Memory limit exceeded" in str(e)



Mime
View raw message