phoenix-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mujt...@apache.org
Subject phoenix git commit: PHOENIX-2359 Configuration for PQS to use Protobuf serialization instead of JSON (elserj)
Date Tue, 17 Nov 2015 00:33:35 GMT
Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.0 1704ee000 -> 7a29510fe


PHOENIX-2359 Configuration for PQS to use Protobuf serialization instead of JSON (elserj)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/7a29510f
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/7a29510f
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/7a29510f

Branch: refs/heads/4.x-HBase-1.0
Commit: 7a29510feddcee97d962b0f378eba97dc31abbe1
Parents: 1704ee0
Author: Mujtaba <mujtaba@apache.org>
Authored: Mon Nov 16 16:33:25 2015 -0800
Committer: Mujtaba <mujtaba@apache.org>
Committed: Mon Nov 16 16:33:25 2015 -0800

----------------------------------------------------------------------
 bin/sqlline-thin.py                             | 30 ++++++-
 .../org/apache/phoenix/query/QueryServices.java | 13 ++-
 .../phoenix/query/QueryServicesOptions.java     | 16 ++++
 .../queryserver/client/ThinClientUtil.java      | 11 ++-
 .../phoenix/end2end/QueryServerBasicsIT.java    |  4 +-
 .../apache/phoenix/queryserver/server/Main.java | 89 +++++++++++---------
 pom.xml                                         |  2 +-
 7 files changed, 116 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a29510f/bin/sqlline-thin.py
----------------------------------------------------------------------
diff --git a/bin/sqlline-thin.py b/bin/sqlline-thin.py
index 9877def..2e237ed 100755
--- a/bin/sqlline-thin.py
+++ b/bin/sqlline-thin.py
@@ -40,6 +40,7 @@ phoenix_utils.setPath()
 
 url = "localhost:8765"
 sqlfile = ""
+serialization_key = 'phoenix.queryserver.serialization'
 
 def usage_and_exit():
     sys.exit("usage: sqlline-thin.py [host[:port]] [sql_file]")
@@ -53,6 +54,29 @@ def cleanup_url(url):
         url = url + ":8765"
     return url
 
+def get_serialization():
+    default_serialization='PROTOBUF'
+    env=os.environ.copy()
+    hbase_cmd = phoenix_utils.which('hbase')
+    if hbase_cmd is None:
+        print 'Failed to find hbase executable on PATH, defaulting serialization to %s.'
% default_serialization
+        return default_serialization
+
+    env['HBASE_CONF_DIR'] = phoenix_utils.hbase_conf_dir
+    proc = subprocess.Popen([hbase_cmd, 'org.apache.hadoop.hbase.util.HBaseConfTool', serialization_key],
+            env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+    (stdout, stderr) = proc.communicate()
+    if proc.returncode != 0:
+        print 'Failed to extract serialization from hbase-site.xml, defaulting to %s.' %
default_serialization
+        return default_serialization
+    # Don't expect this to happen, but give a default value just in case
+    if stdout is None:
+        return default_serialization
+
+    stdout = stdout.strip()
+    if stdout == 'null':
+        return default_serialization
+    return stdout
 
 if len(sys.argv) == 1:
     pass
@@ -81,6 +105,8 @@ if os.name == 'nt':
 # HBase/Phoenix client side property override
 hbase_config_path = os.getenv('HBASE_CONF_DIR', phoenix_utils.current_dir)
 
+serialization = get_serialization()
+
 java_home = os.getenv('JAVA_HOME')
 
 # load hbase-env.??? to extract JAVA_HOME, HBASE_PID_DIR, HBASE_LOG_DIR
@@ -112,10 +138,10 @@ else:
     java = 'java'
 
 java_cmd = java + ' -cp "' + phoenix_utils.hbase_conf_dir + os.pathsep + phoenix_utils.phoenix_thin_client_jar
+ \
-    os.pathsep + phoenix_utils.hadoop_conf + os.pathsep + phoenix_utils.hadoop_classpath
+ '" -Dlog4j.configuration=file:' + \ 
+    os.pathsep + phoenix_utils.hadoop_conf + os.pathsep + phoenix_utils.hadoop_classpath
+ '" -Dlog4j.configuration=file:' + \
     os.path.join(phoenix_utils.current_dir, "log4j.properties") + \
     " sqlline.SqlLine -d org.apache.phoenix.queryserver.client.Driver " + \
-    " -u jdbc:phoenix:thin:url=" + url + \
+    " -u jdbc:phoenix:thin:url='" + url + ";serialization=" + serialization + "'" + \
     " -n none -p none --color=" + colorSetting + " --fastConnect=false --verbose=true " +
\
     " --isolation=TRANSACTION_READ_COMMITTED " + sqlfile
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a29510f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 2a77717..4ac4e96 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -171,7 +171,18 @@ public interface QueryServices extends SQLCloseable {
     public static final String ALLOW_VIEWS_ADD_NEW_CF_BASE_TABLE = "phoenix.schema.view.newcf";
     public static final String RETURN_SEQUENCE_VALUES_ATTRIB = "phoenix.sequence.returnValues";
     public static final String EXTRA_JDBC_ARGUMENTS_ATTRIB = "phoenix.jdbc.extra.arguments";
-    
+
+    // queryserver configuration keys
+    public static final String QUERY_SERVER_SERIALIZATION_ATTRIB = "phoenix.queryserver.serialization";
+    public static final String QUERY_SERVER_META_FACTORY_ATTRIB = "phoenix.queryserver.metafactory.class";
+    public static final String QUERY_SERVER_HTTP_PORT_ATTRIB = "phoenix.queryserver.http.port";
+    public static final String QUERY_SERVER_ENV_LOGGING_ATTRIB = "phoenix.queryserver.envvars.logging.disabled";
+    public static final String QUERY_SERVER_ENV_LOGGING_SKIPWORDS_ATTRIB = "phoenix.queryserver.envvars.logging.skipwords";
+    public static final String QUERY_SERVER_KEYTAB_FILENAME_ATTRIB = "phoenix.queryserver.keytab.file";
+    public static final String QUERY_SERVER_KERBEROS_PRINCIPAL_ATTRIB = "phoenix.queryserver.kerberos.principal";
+    public static final String QUERY_SERVER_DNS_NAMESERVER_ATTRIB = "phoenix.queryserver.dns.nameserver";
+    public static final String QUERY_SERVER_DNS_INTERFACE_ATTRIB = "phoenix.queryserver.dns.interface";
+    public static final String QUERY_SERVER_HBASE_SECURITY_CONF_ATTRIB = "hbase.security.authentication";
 
     /**
      * Get executor service used for parallel scans

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a29510f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 0157f75..ea41b41 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -68,6 +68,8 @@ import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_BYTE_BASED_REGEX_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.USE_INDEXES_ATTRIB;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.conf.Configuration;
@@ -205,6 +207,20 @@ public class QueryServicesOptions {
     public static final boolean DEFAULT_RETURN_SEQUENCE_VALUES = false;
     public static final String DEFAULT_EXTRA_JDBC_ARGUMENTS = "";
 
+    // QueryServer defaults -- ensure ThinClientUtil is also updated since phoenix-server-client
+    // doesn't depend on phoenix-core.
+    public static final String DEFAULT_QUERY_SERVER_SERIALIZATION = "PROTOBUF";
+    public static final int DEFAULT_QUERY_SERVER_HTTP_PORT = 8765;
+    @SuppressWarnings("serial")
+    public static final Set<String> DEFAULT_QUERY_SERVER_SKIP_WORDS = new HashSet<String>()
{
+      {
+        add("secret");
+        add("passwd");
+        add("password");
+        add("credential");
+      }
+    };
+
     private final Configuration config;
 
     private QueryServicesOptions(Configuration config) {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a29510f/phoenix-server-client/src/main/java/org/apache/phoenix/queryserver/client/ThinClientUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-server-client/src/main/java/org/apache/phoenix/queryserver/client/ThinClientUtil.java
b/phoenix-server-client/src/main/java/org/apache/phoenix/queryserver/client/ThinClientUtil.java
index 0ef1c8c..af56c33 100644
--- a/phoenix-server-client/src/main/java/org/apache/phoenix/queryserver/client/ThinClientUtil.java
+++ b/phoenix-server-client/src/main/java/org/apache/phoenix/queryserver/client/ThinClientUtil.java
@@ -21,6 +21,9 @@ package org.apache.phoenix.queryserver.client;
  * Utilities for thin clients.
  */
 public final class ThinClientUtil {
+  // The default serialization is also defined in QueryServicesOptions. phoenix-server-client
+  // currently doesn't depend on phoenix-core so we have to deal with the duplication.
+  private static final String DEFAULT_SERIALIZATION = "PROTOBUF";
 
   private ThinClientUtil() {}
 
@@ -29,7 +32,11 @@ public final class ThinClientUtil {
   }
 
   public static String getConnectionUrl(String protocol, String hostname, int port) {
-    String urlFmt = Driver.CONNECT_STRING_PREFIX + "url=%s://%s:%s";
-    return String.format(urlFmt, protocol, hostname, port);
+    return getConnectionUrl(protocol, hostname, port, DEFAULT_SERIALIZATION);
+  }
+
+  public static String getConnectionUrl(String protocol, String hostname, int port, String
serialization) {
+    String urlFmt = Driver.CONNECT_STRING_PREFIX + "url=%s://%s:%s;serialization=%s";
+    return String.format(urlFmt, protocol, hostname, port, serialization);
   }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a29510f/phoenix-server/src/it/java/org/apache/phoenix/end2end/QueryServerBasicsIT.java
----------------------------------------------------------------------
diff --git a/phoenix-server/src/it/java/org/apache/phoenix/end2end/QueryServerBasicsIT.java
b/phoenix-server/src/it/java/org/apache/phoenix/end2end/QueryServerBasicsIT.java
index 3b18c4e..3003f31 100644
--- a/phoenix-server/src/it/java/org/apache/phoenix/end2end/QueryServerBasicsIT.java
+++ b/phoenix-server/src/it/java/org/apache/phoenix/end2end/QueryServerBasicsIT.java
@@ -20,8 +20,8 @@ package org.apache.phoenix.end2end;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.queryserver.client.ThinClientUtil;
-import org.apache.phoenix.queryserver.server.Main;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -57,7 +57,7 @@ public class QueryServerBasicsIT extends BaseHBaseManagedTimeIT {
   @BeforeClass
   public static void beforeClass() throws Exception {
     CONF = getTestClusterConfig();
-    CONF.setInt(Main.QUERY_SERVER_HTTP_PORT_KEY, 0);
+    CONF.setInt(QueryServices.QUERY_SERVER_HTTP_PORT_ATTRIB, 0);
     String url = getUrl();
     AVATICA_SERVER = new QueryServerThread(new String[] { url }, CONF,
             QueryServerBasicsIT.class.getName());

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a29510f/phoenix-server/src/main/java/org/apache/phoenix/queryserver/server/Main.java
----------------------------------------------------------------------
diff --git a/phoenix-server/src/main/java/org/apache/phoenix/queryserver/server/Main.java
b/phoenix-server/src/main/java/org/apache/phoenix/queryserver/server/Main.java
index 9f9bfc7..106d422 100644
--- a/phoenix-server/src/main/java/org/apache/phoenix/queryserver/server/Main.java
+++ b/phoenix-server/src/main/java/org/apache/phoenix/queryserver/server/Main.java
@@ -19,9 +19,10 @@ package org.apache.phoenix.queryserver.server;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.remote.Driver;
 import org.apache.calcite.avatica.remote.LocalService;
 import org.apache.calcite.avatica.remote.Service;
-import org.apache.calcite.avatica.server.AvaticaHandler;
+import org.apache.calcite.avatica.server.HandlerFactory;
 import org.apache.calcite.avatica.server.HttpServer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -33,6 +34,9 @@ import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.eclipse.jetty.server.Handler;
 
 import java.lang.management.ManagementFactory;
 import java.lang.management.RuntimeMXBean;
@@ -48,36 +52,8 @@ import java.util.concurrent.TimeUnit;
  */
 public final class Main extends Configured implements Tool, Runnable {
 
-  public static final String QUERY_SERVER_META_FACTORY_KEY =
-      "phoenix.queryserver.metafactory.class";
-
-  public static final String QUERY_SERVER_HTTP_PORT_KEY =
-      "phoenix.queryserver.http.port";
-  public static final int DEFAULT_HTTP_PORT = 8765;
-
-  public static final String QUERY_SERVER_ENV_LOGGING_KEY =
-          "phoenix.queryserver.envvars.logging.disabled";
-  public static final String QUERY_SERVER_ENV_LOGGING_SKIPWORDS_KEY =
-          "phoenix.queryserver.envvars.logging.skipwords";
-
-  public static final String KEYTAB_FILENAME_KEY = "phoenix.queryserver.keytab.file";
-  public static final String KERBEROS_PRINCIPAL_KEY = "phoenix.queryserver.kerberos.principal";
-  public static final String DNS_NAMESERVER_KEY = "phoenix.queryserver.dns.nameserver";
-  public static final String DNS_INTERFACE_KEY = "phoenix.queryserver.dns.interface";
-  public static final String HBASE_SECURITY_CONF_KEY = "hbase.security.authentication";
-
   protected static final Log LOG = LogFactory.getLog(Main.class);
 
-  @SuppressWarnings("serial")
-  private static final Set<String> DEFAULT_SKIP_WORDS = new HashSet<String>()
{
-    {
-      add("secret");
-      add("passwd");
-      add("password");
-      add("credential");
-    }
-  };
-
   private final String[] argv;
   private final CountDownLatch runningLatch = new CountDownLatch(1);
   private HttpServer server = null;
@@ -107,10 +83,10 @@ public final class Main extends Configured implements Tool, Runnable
{
    */
   public static void logProcessInfo(Configuration conf) {
     // log environment variables unless asked not to
-    if (conf == null || !conf.getBoolean(QUERY_SERVER_ENV_LOGGING_KEY, false)) {
-      Set<String> skipWords = new HashSet<String>(DEFAULT_SKIP_WORDS);
+    if (conf == null || !conf.getBoolean(QueryServices.QUERY_SERVER_ENV_LOGGING_ATTRIB, false))
{
+      Set<String> skipWords = new HashSet<String>(QueryServicesOptions.DEFAULT_QUERY_SERVER_SKIP_WORDS);
       if (conf != null) {
-        String[] confSkipWords = conf.getStrings(QUERY_SERVER_ENV_LOGGING_SKIPWORDS_KEY);
+        String[] confSkipWords = conf.getStrings(QueryServices.QUERY_SERVER_ENV_LOGGING_SKIPWORDS_ATTRIB);
         if (confSkipWords != null) {
           skipWords.addAll(Arrays.asList(confSkipWords));
         }
@@ -183,26 +159,29 @@ public final class Main extends Configured implements Tool, Runnable
{
     logProcessInfo(getConf());
     try {
       // handle secure cluster credentials
-      if ("kerberos".equalsIgnoreCase(getConf().get(HBASE_SECURITY_CONF_KEY))) {
+      if ("kerberos".equalsIgnoreCase(getConf().get(QueryServices.QUERY_SERVER_HBASE_SECURITY_CONF_ATTRIB)))
{
         String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
-            getConf().get(DNS_INTERFACE_KEY, "default"),
-            getConf().get(DNS_NAMESERVER_KEY, "default")));
+            getConf().get(QueryServices.QUERY_SERVER_DNS_INTERFACE_ATTRIB, "default"),
+            getConf().get(QueryServices.QUERY_SERVER_DNS_NAMESERVER_ATTRIB, "default")));
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Login to " + hostname + " using " + getConf().get(KEYTAB_FILENAME_KEY)
-              + " and principal " + getConf().get(KERBEROS_PRINCIPAL_KEY) + ".");
+          LOG.debug("Login to " + hostname + " using " + getConf().get(QueryServices.QUERY_SERVER_KEYTAB_FILENAME_ATTRIB)
+              + " and principal " + getConf().get(QueryServices.QUERY_SERVER_KERBEROS_PRINCIPAL_ATTRIB)
+ ".");
         }
-        SecurityUtil.login(getConf(), KEYTAB_FILENAME_KEY, KERBEROS_PRINCIPAL_KEY, hostname);
+        SecurityUtil.login(getConf(), QueryServices.QUERY_SERVER_KEYTAB_FILENAME_ATTRIB,
+            QueryServices.QUERY_SERVER_KERBEROS_PRINCIPAL_ATTRIB, hostname);
         LOG.info("Login successful.");
       }
       Class<? extends PhoenixMetaFactory> factoryClass = getConf().getClass(
-          QUERY_SERVER_META_FACTORY_KEY, PhoenixMetaFactoryImpl.class, PhoenixMetaFactory.class);
-      int port = getConf().getInt(QUERY_SERVER_HTTP_PORT_KEY, DEFAULT_HTTP_PORT);
+          QueryServices.QUERY_SERVER_META_FACTORY_ATTRIB, PhoenixMetaFactoryImpl.class, PhoenixMetaFactory.class);
+      int port = getConf().getInt(QueryServices.QUERY_SERVER_HTTP_PORT_ATTRIB,
+          QueryServicesOptions.DEFAULT_QUERY_SERVER_HTTP_PORT);
       LOG.debug("Listening on port " + port);
       PhoenixMetaFactory factory =
           factoryClass.getDeclaredConstructor(Configuration.class).newInstance(getConf());
       Meta meta = factory.create(Arrays.asList(args));
+      final HandlerFactory handlerFactory = new HandlerFactory();
       Service service = new LocalService(meta);
-      server = new HttpServer(port, new AvaticaHandler(service));
+      server = new HttpServer(port, getHandler(getConf(), service, handlerFactory));
       server.start();
       runningLatch.countDown();
       server.join();
@@ -214,6 +193,34 @@ public final class Main extends Configured implements Tool, Runnable
{
     }
   }
 
+  /**
+   * Instantiates the Handler for use by the Avatica (Jetty) server.
+   *
+   * @param conf The configuration
+   * @param service The Avatica Service implementation
+   * @param handlerFactory Factory used for creating a Handler
+   * @return The Handler to use based on the configuration.
+   */
+  Handler getHandler(Configuration conf, Service service, HandlerFactory handlerFactory)
{
+    String serializationName = conf.get(QueryServices.QUERY_SERVER_SERIALIZATION_ATTRIB,
+        QueryServicesOptions.DEFAULT_QUERY_SERVER_SERIALIZATION);
+
+    Driver.Serialization serialization;
+    // Otherwise, use what was provided in the configuration
+    try {
+      serialization = Driver.Serialization.valueOf(serializationName);
+    } catch (Exception e) {
+      LOG.error("Unknown message serialization type for " + serializationName);
+      throw e;
+    }
+
+    Handler handler = handlerFactory.getHandler(service, serialization);
+
+    LOG.info("Instantiated " + handler.getClass() + " for QueryServer");
+
+    return handler;
+  }
+
   @Override public void run() {
     try {
       retCode = run(argv);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/7a29510f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fd872ab..f84971e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,7 +110,7 @@
     <collections.version>3.2.1</collections.version>
     <jodatime.version>2.7</jodatime.version>
     <joni.version>2.1.2</joni.version>
-    <calcite.version>1.3.0-incubating</calcite.version>
+    <calcite.version>1.5.0</calcite.version>
     <jettyVersion>8.1.7.v20120910</jettyVersion>
 
     <!-- Test Dependencies -->


Mime
View raw message