kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject [2/2] kudu git commit: [python] - Update kudu.connect to enable multi-master
Date Mon, 31 Oct 2016 22:03:38 GMT
[python] - Update kudu.connect to enable multi-master

Currently, the kudu.connect method does not support multi-master.
This patch addresses this and also adds a default for the port.
Additionally, this patch modifies the existing test bed to run on
a multi-master instance.

Change-Id: Ia2721236d5f92ced2afb4a867511c4144a2ab16a
Reviewed-on: http://gerrit.cloudera.org:8080/4883
Tested-by: Kudu Jenkins
Reviewed-by: Will Berkeley <wdberkeley@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/6f54154f
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/6f54154f
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/6f54154f

Branch: refs/heads/master
Commit: 6f54154fdec76cbab966d0d222b5c24b27f8c7ba
Parents: c91d04a
Author: Jordan Birdsell <jordantbirdsell@gmail.com>
Authored: Sat Oct 29 13:04:11 2016 -0400
Committer: Will Berkeley <wdberkeley@gmail.com>
Committed: Sun Oct 30 16:10:20 2016 +0000

----------------------------------------------------------------------
 python/kudu/__init__.py             |  31 +++++++---
 python/kudu/client.pyx              |   5 ++
 python/kudu/tests/common.py         | 100 ++++++++++++++++---------------
 python/kudu/tests/test_client.py    |   4 +-
 python/kudu/tests/test_scantoken.py |   4 +-
 5 files changed, 84 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/6f54154f/python/kudu/__init__.py
----------------------------------------------------------------------
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index 8ff299c..828c3cf 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -49,16 +49,17 @@ from kudu.schema import (int8, int16, int32, int64, string_ as string,
 # noqa
                          ENCODING_RLE)
 
 
-def connect(host, port, admin_timeout_ms=None, rpc_timeout_ms=None):
+def connect(host, port=7051, admin_timeout_ms=None, rpc_timeout_ms=None):
     """
     Connect to a Kudu master server
 
     Parameters
     ----------
-    host : string
-      Server address of master
-    port : int
-      Server port
+    host : string/list
+      Server address of master or a list of addresses
+    port : int/list, optional, default 7051
+      Server port or list of ports. If a list of addresses is provided and
+      only a single port, that port will be used for all master addresses.
     admin_timeout_ms : int, optional
       Admin timeout in milliseconds
     rpc_timeout_ms : int, optional
@@ -68,8 +69,24 @@ def connect(host, port, admin_timeout_ms=None, rpc_timeout_ms=None):
     -------
     client : kudu.Client
     """
-    addr = '{0}:{1}'.format(host, port)
-    return Client(addr, admin_timeout_ms=admin_timeout_ms,
+    addresses = []
+    if isinstance(host, list):
+        if isinstance(port, list):
+            if len(host) == len(port):
+                for h, p in zip(host, port):
+                    addresses.append('{0}:{1}'.format(h, p))
+            else:
+                raise ValueError("Host and port lists are not of equal length.")
+        else:
+            for h in host:
+                addresses.append('{0}:{1}'.format(h, port))
+    else:
+        if isinstance(port, list):
+            raise ValueError("List of ports provided but only a single host.")
+        else:
+            addresses.append('{0}:{1}'.format(host, port))
+
+    return Client(addresses, admin_timeout_ms=admin_timeout_ms,
                   rpc_timeout_ms=rpc_timeout_ms)
 
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6f54154f/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 4a0a84f..23e2edb 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -229,6 +229,11 @@ cdef class Client:
         elif not isinstance(addr_or_addrs, list):
             addr_or_addrs = list(addr_or_addrs)
 
+        # Raise exception for empty iters, otherwise the connection call
+        # will hang
+        if not addr_or_addrs:
+            raise ValueError("Empty iterator for addr_or_addrs.")
+
         self.master_addrs = addr_or_addrs
         for addr in addr_or_addrs:
             c_addrs.push_back(tobytes(addr))

http://git-wip-us.apache.org/repos/asf/kudu/blob/6f54154f/python/kudu/tests/common.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/common.py b/python/kudu/tests/common.py
index f39c074..69738e8 100644
--- a/python/kudu/tests/common.py
+++ b/python/kudu/tests/common.py
@@ -25,6 +25,7 @@ import shutil
 import subprocess
 import tempfile
 import time
+import socket
 
 import kudu
 from kudu.client import Partitioning
@@ -39,6 +40,7 @@ class KuduTestBase(object):
     BASE_PORT = 37000
     NUM_TABLET_SERVERS = 3
     TSERVER_START_TIMEOUT_SECS = 10
+    NUM_MASTER_SERVERS = 3
 
     @classmethod
     def start_cluster(cls):
@@ -48,47 +50,50 @@ class KuduTestBase(object):
             kudu_build = os.path.join(os.getenv("KUDU_HOME"), "build", "latest")
         bin_path = "{0}/bin".format(kudu_build)
 
-        os.makedirs("{0}/master/".format(local_path))
-        os.makedirs("{0}/master/data".format(local_path))
-        os.makedirs("{0}/master/logs".format(local_path))
-
-        path = [
-            "{0}/kudu-master".format(bin_path),
-            "-unlock_unsafe_flags",
-            "-unlock_experimental_flags",
-            "-rpc_server_allow_ephemeral_ports",
-            "-rpc_bind_addresses=0.0.0.0:0",
-            "-fs_wal_dir={0}/master/data".format(local_path),
-            "-fs_data_dirs={0}/master/data".format(local_path),
-            "-log_dir={0}/master/logs".format(local_path),
-            "-logtostderr",
-            "-webserver_port=0",
-            # Only make one replica so that our tests don't need to worry about
-            # setting consistency modes.
-            "-default_num_replicas=1",
-            "-server_dump_info_path={0}/master/config.json".format(local_path)
-        ]
-
-        p = subprocess.Popen(path, shell=False)
-        fid = open("{0}/master/kudu-master.pid".format(local_path), "w+")
-        fid.write("{0}".format(p.pid))
-        fid.close()
-
-        # We have to wait for the master to settle before the config file
-        # appears
-        config_file = "{0}/master/config.json".format(local_path)
-        for i in range(30):
-            if os.path.exists(config_file):
-                break
-            time.sleep(0.1 * (i + 1))
-        else:
-            raise Exception("Could not find kudu-master config file")
-
-        # If the server was started get the bind port from the config dump
-        master_config = json.load(open("{0}/master/config.json"
-                                       .format(local_path), "r"))
-        # One master bound on local host
-        master_port = master_config["bound_rpc_addresses"][0]["port"]
+        master_hosts = []
+        master_ports = []
+
+        # We need to get the port numbers for the masters before starting them
+        # so that we can appropriately configure a multi-master.
+        for m in range(cls.NUM_MASTER_SERVERS):
+            master_hosts.append('127.0.0.1')
+            # This introduces a race
+            s = socket.socket()
+            s.bind(('', 0))
+            master_ports.append(s.getsockname()[1])
+            s.close()
+
+        multi_master_string = ','.join('{0}:{1}'.format(host, port)
+                                       for host, port
+                                       in zip(master_hosts, master_ports))
+
+        for m in range(cls.NUM_MASTER_SERVERS):
+            os.makedirs("{0}/master/{1}".format(local_path, m))
+            os.makedirs("{0}/master/{1}/data".format(local_path, m))
+            os.makedirs("{0}/master/{1}/logs".format(local_path, m))
+
+
+            path = [
+                "{0}/kudu-master".format(bin_path),
+                "-unlock_unsafe_flags",
+                "-unlock_experimental_flags",
+                "-rpc_server_allow_ephemeral_ports",
+                "-rpc_bind_addresses=0.0.0.0:{0}".format(master_ports[m]),
+                "-fs_wal_dir={0}/master/{1}/data".format(local_path, m),
+                "-fs_data_dirs={0}/master/{1}/data".format(local_path, m),
+                "-log_dir={0}/master/{1}/logs".format(local_path, m),
+                "-logtostderr",
+                "-webserver_port=0",
+                "-master_addresses={0}".format(multi_master_string),
+                # Only make one replica so that our tests don't need to worry about
+                # setting consistency modes.
+                "-default_num_replicas=1"
+            ]
+
+            p = subprocess.Popen(path, shell=False)
+            fid = open("{0}/master/{1}/kudu-master.pid".format(local_path, m), "w+")
+            fid.write("{0}".format(p.pid))
+            fid.close()
 
         for m in range(cls.NUM_TABLET_SERVERS):
             os.makedirs("{0}/ts/{1}".format(local_path, m))
@@ -100,9 +105,9 @@ class KuduTestBase(object):
                 "-unlock_experimental_flags",
                 "-rpc_server_allow_ephemeral_ports",
                 "-rpc_bind_addresses=0.0.0.0:0",
-                "-tserver_master_addrs=127.0.0.1:{0}".format(master_port),
+                "-tserver_master_addrs={0}".format(multi_master_string),
                 "-webserver_port=0",
-                "-log_dir={0}/master/logs".format(local_path),
+                "-log_dir={0}/ts/{1}/logs".format(local_path, m),
                 "-logtostderr",
                 "-fs_data_dirs={0}/ts/{1}/data".format(local_path, m),
                 "-fs_wal_dir={0}/ts/{1}/data".format(local_path, m),
@@ -113,7 +118,7 @@ class KuduTestBase(object):
             fid.write("{0}".format(p.pid))
             fid.close()
 
-        return local_path, master_port
+        return local_path, master_hosts, master_ports
 
     @classmethod
     def stop_cluster(cls, path):
@@ -128,13 +133,10 @@ class KuduTestBase(object):
 
     @classmethod
     def setUpClass(cls):
-        cls.cluster_path, master_port = cls.start_cluster()
+        cls.cluster_path, cls.master_hosts, cls.master_ports = cls.start_cluster()
         time.sleep(1)
 
-        cls.master_host = '127.0.0.1'
-        cls.master_port = master_port
-
-        cls.client = kudu.connect(cls.master_host, cls.master_port)
+        cls.client = kudu.connect(cls.master_hosts, cls.master_ports)
 
         # Wait for all tablet servers to start with the configured timeout
         timeout = time.time() + cls.TSERVER_START_TIMEOUT_SECS

http://git-wip-us.apache.org/repos/asf/kudu/blob/6f54154f/python/kudu/tests/test_client.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index 65e7c29..b5159ee 100644
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -83,7 +83,7 @@ class TestClient(KuduTestBase, unittest.TestCase):
             self.client.delete_table(name)
 
     def test_is_multimaster(self):
-        assert not self.client.is_multimaster
+        assert self.client.is_multimaster
 
     def test_delete_table(self):
         name = "peekaboo"
@@ -226,7 +226,7 @@ class TestClient(KuduTestBase, unittest.TestCase):
 
     def test_connect_timeouts(self):
         # it works! any other way to check
-        kudu.connect(self.master_host, self.master_port,
+        kudu.connect(self.master_hosts, self.master_ports,
                      admin_timeout_ms=100,
                      rpc_timeout_ms=100)
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/6f54154f/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py
index fbe66df..e027cf5 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -24,7 +24,7 @@ from multiprocessing import Pool
 import datetime
 
 def _get_scan_token_results(input):
-    client = kudu.Client("{0}:{1}".format(input[1], input[2]))
+    client = kudu.connect(input[1], input[2])
     scanner = client.deserialize_token_into_scanner(input[0])
     scanner.open()
     return scanner.read_all_tuples()
@@ -43,7 +43,7 @@ class TestScanToken(TestScanBase):
         Given the input serialized tokens, spawn new threads,
         execute them and validate the results
         """
-        input =  [(token.serialize(), self.master_host, self.master_port)
+        input =  [(token.serialize(), self.master_hosts, self.master_ports)
                 for token in tokens]
 
         # Begin process pool


Mime
View raw message