zeppelin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jongy...@apache.org
Subject zeppelin git commit: [ZEPPELIN-1405] ConnectionPool for JDBCInterpreter.
Date Wed, 28 Sep 2016 14:10:47 GMT
Repository: zeppelin
Updated Branches:
  refs/heads/master 46ecf77b7 -> b24491baf


[ZEPPELIN-1405] ConnectionPool for JDBCInterpreter.

### What is this PR for?
This PR is for refactoring code for JDBCInterpreter.
There is no putting 'Connection' to 'propertyKeyUnusedConnectionListMap' anywhere in the original
code.

### What type of PR is it?
Improvement

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-1405

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: astroshim <hsshim@nflabs.com>

Closes #1396 from astroshim/ZEPPELIN-1405 and squashes the following commits:

b07e162 [astroshim] add checking connection is null
f6998c2 [astroshim] Merge branch 'master' into ZEPPELIN-1405
1862ae6 [astroshim] Merge branch 'master' into ZEPPELIN-1405
efc2bfc [astroshim] rebase
21217a7 [astroshim] fix indentation.
4d4f85c [astroshim] refactoring code of close()
9f1e368 [astroshim] replace ConnectionPool
4dabbcc [astroshim] wip) changing to use dbcp
12dd7cb [astroshim] remove propertyKeyUnusedConnectionListMap map


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/b24491ba
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/b24491ba
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/b24491ba

Branch: refs/heads/master
Commit: b24491bafa78693457687dd5da460d5e387e9ddb
Parents: 46ecf77
Author: astroshim <hsshim@nflabs.com>
Authored: Fri Sep 23 16:31:19 2016 +0900
Committer: Jongyoul Lee <jongyoul@apache.org>
Committed: Wed Sep 28 23:10:37 2016 +0900

----------------------------------------------------------------------
 jdbc/pom.xml                                    |   6 +
 .../apache/zeppelin/jdbc/JDBCInterpreter.java   | 198 +++++++++----------
 2 files changed, 105 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b24491ba/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index f4e97c9..73c66c0 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -104,6 +104,12 @@
       <version>1.0.8</version>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+      <groupId>org.apache.commons</groupId>
+      <artifactId>commons-dbcp2</artifactId>
+      <version>2.0.1</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/b24491ba/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
----------------------------------------------------------------------
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 0655f3a..5f784d7 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -19,15 +19,16 @@ import java.io.*;
 import java.nio.charset.StandardCharsets;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.ResultSetMetaData;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
 import java.util.*;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.dbcp2.ConnectionFactory;
+import org.apache.commons.dbcp2.DriverManagerConnectionFactory;
+import org.apache.commons.dbcp2.PoolableConnectionFactory;
+import org.apache.commons.dbcp2.PoolingDriver;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.pool2.ObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -99,14 +100,15 @@ public class JDBCInterpreter extends Interpreter {
 
   static final String EMPTY_COLUMN_VALUE = "";
 
+
   private final String CONCURRENT_EXECUTION_KEY = "zeppelin.jdbc.concurrent.use";
   private final String CONCURRENT_EXECUTION_COUNT = "zeppelin.jdbc.concurrent.max_connection";
 
+  private final String DBCP_STRING = "jdbc:apache:commons:dbcp:";
+
   private final HashMap<String, Properties> propertiesMap;
   private final Map<String, Statement> paragraphIdStatementMap;
-
-  private final Map<String, ArrayList<Connection>> propertyKeyUnusedConnectionListMap;
-  private final Map<String, Connection> paragraphIdConnectionMap;
+  private final Map<String, PoolingDriver> poolingDriverMap;
 
   private final Map<String, SqlCompleter> propertyKeySqlCompleterMap;
 
@@ -122,9 +124,8 @@ public class JDBCInterpreter extends Interpreter {
   public JDBCInterpreter(Properties property) {
     super(property);
     propertiesMap = new HashMap<>();
-    propertyKeyUnusedConnectionListMap = new HashMap<>();
     paragraphIdStatementMap = new HashMap<>();
-    paragraphIdConnectionMap = new HashMap<>();
+    poolingDriverMap = new HashMap<>();
     propertyKeySqlCompleterMap = new HashMap<>();
   }
 
@@ -193,22 +194,41 @@ public class JDBCInterpreter extends Interpreter {
     return completer;
   }
 
+  private boolean isConnectionInPool(String driverName) {
+    if (poolingDriverMap.containsKey(driverName)) return true;
+    return false;
+  }
+
+  private void createConnectionPool(String url, String propertyKey, Properties properties)
{
+    ConnectionFactory connectionFactory =
+      new DriverManagerConnectionFactory(url, properties);
+
+    PoolableConnectionFactory poolableConnectionFactory = new PoolableConnectionFactory(
+      connectionFactory, null);
+    ObjectPool connectionPool = new GenericObjectPool(poolableConnectionFactory);
+
+    poolableConnectionFactory.setPool(connectionPool);
+    PoolingDriver driver = new PoolingDriver();
+    driver.registerPool(propertyKey, connectionPool);
+
+    poolingDriverMap.put(propertyKey, driver);
+  }
+
+  private Connection getConnectionFromPool(String url, String propertyKey, Properties properties)
+      throws SQLException {
+    if (!isConnectionInPool(propertyKey)) {
+      createConnectionPool(url, propertyKey, properties);
+    }
+
+    return DriverManager.getConnection(DBCP_STRING + propertyKey);
+  }
+
   public Connection getConnection(String propertyKey, String user)
       throws ClassNotFoundException, SQLException, InterpreterException {
     Connection connection = null;
     if (propertyKey == null || propertiesMap.get(propertyKey) == null) {
       return null;
     }
-    if (propertyKeyUnusedConnectionListMap.containsKey(propertyKey)) {
-      ArrayList<Connection> connectionList = propertyKeyUnusedConnectionListMap.get(propertyKey);
-      if (0 != connectionList.size()) {
-        connection = propertyKeyUnusedConnectionListMap.get(propertyKey).remove(0);
-        if (null != connection && connection.isClosed()) {
-          connection.close();
-          connection = null;
-        }
-      }
-    }
     if (null == connection) {
       final Properties properties = (Properties) propertiesMap.get(propertyKey).clone();
       logger.info(properties.getProperty(DRIVER_KEY));
@@ -222,16 +242,16 @@ public class JDBCInterpreter extends Interpreter {
         switch (authType) {
             case KERBEROS:
               if (user == null) {
-                connection = DriverManager.getConnection(url, properties);
+                connection = getConnectionFromPool(url, propertyKey, properties);
               } else {
                 if ("hive".equalsIgnoreCase(propertyKey)) {
-                  connection = DriverManager.getConnection(url + ";hive.server2.proxy.user="
+ user,
-                      properties);
+                  connection = getConnectionFromPool(url + ";hive.server2.proxy.user=" +
user,
+                    propertyKey, properties);
                 } else {
                   UserGroupInformation ugi = null;
                   try {
                     ugi = UserGroupInformation.createProxyUser(user,
-                        UserGroupInformation.getCurrentUser());
+                      UserGroupInformation.getCurrentUser());
                   } catch (Exception e) {
                     logger.error("Error in createProxyUser", e);
                     StringBuilder stringBuilder = new StringBuilder();
@@ -239,11 +259,13 @@ public class JDBCInterpreter extends Interpreter {
                     stringBuilder.append(e.getCause());
                     throw new InterpreterException(stringBuilder.toString());
                   }
+
+                  final String poolKey = propertyKey;
                   try {
                     connection = ugi.doAs(new PrivilegedExceptionAction<Connection>()
{
                       @Override
                       public Connection run() throws Exception {
-                        return DriverManager.getConnection(url, properties);
+                        return getConnectionFromPool(url, poolKey, properties);
                       }
                     });
                   } catch (Exception e) {
@@ -258,7 +280,7 @@ public class JDBCInterpreter extends Interpreter {
               break;
 
             default:
-              connection = DriverManager.getConnection(url, properties);
+              connection = getConnectionFromPool(url, propertyKey, properties);
         }
       }
     }
@@ -266,75 +288,41 @@ public class JDBCInterpreter extends Interpreter {
     return connection;
   }
 
-  public Statement getStatement(String propertyKey, String paragraphId,
-                                InterpreterContext interpreterContext)
-      throws SQLException, ClassNotFoundException, InterpreterException {
-    Connection connection;
-
-    if (paragraphIdConnectionMap.containsKey(paragraphId +
-        interpreterContext.getAuthenticationInfo().getUser())) {
-      connection = paragraphIdConnectionMap.get(paragraphId +
-          interpreterContext.getAuthenticationInfo().getUser());
-    } else {
-      connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser());
-    }
-
-    if (connection == null) {
-      return null;
+  private void initStatementMap() {
+    for (Statement statement : paragraphIdStatementMap.values()) {
+      try {
+        statement.close();
+      } catch (Exception e) {
+        logger.error("Error while closing paragraphIdStatementMap statement...", e);
+      }
     }
+    paragraphIdStatementMap.clear();
+  }
 
-    Statement statement = connection.createStatement();
-    if (isStatementClosed(statement)) {
-      connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser());
-      statement = connection.createStatement();
+  private void initConnectionPoolMap() throws SQLException {
+    Iterator<String> it = poolingDriverMap.keySet().iterator();
+    while (it.hasNext()) {
+      String driverName = it.next();
+      poolingDriverMap.get(driverName).closePool(driverName);
+      it.remove();
     }
-    paragraphIdConnectionMap.put(paragraphId + interpreterContext.getAuthenticationInfo().getUser(),
-        connection);
-    paragraphIdStatementMap.put(paragraphId + interpreterContext.getAuthenticationInfo().getUser(),
-        statement);
+    poolingDriverMap.clear();
+  }
 
-    return statement;
+  private void saveStatement(String key, Statement statement) throws SQLException {
+    paragraphIdStatementMap.put(key, statement);
+    statement.setMaxRows(getMaxResult());
   }
 
-  private boolean isStatementClosed(Statement statement) {
-    try {
-      return statement.isClosed();
-    } catch (Throwable t) {
-      logger.debug("{} doesn't support isClosed method", statement);
-      return false;
-    }
+  private void removeStatement(String key) {
+    paragraphIdStatementMap.remove(key);
   }
 
   @Override
   public void close() {
     try {
-      for (List<Connection> connectionList : propertyKeyUnusedConnectionListMap.values())
{
-        for (Connection c : connectionList) {
-          try {
-            c.close();
-          } catch (Exception e) {
-            logger.error("Error while closing propertyKeyUnusedConnectionListMap connection...",
e);
-          }
-        }
-      }
-
-      for (Statement statement : paragraphIdStatementMap.values()) {
-        try {
-          statement.close();
-        } catch (Exception e) {
-          logger.error("Error while closing paragraphIdStatementMap statement...", e);
-        }
-      }
-      paragraphIdStatementMap.clear();
-
-      for (Connection connection : paragraphIdConnectionMap.values()) {
-        try {
-          connection.close();
-        } catch (Exception e) {
-          logger.error("Error while closing paragraphIdConnectionMap connection...", e);
-        }
-      }
-      paragraphIdConnectionMap.clear();
+      initStatementMap();
+      initConnectionPoolMap();
     } catch (Exception e) {
       logger.error("Error while closing...", e);
     }
@@ -342,17 +330,21 @@ public class JDBCInterpreter extends Interpreter {
 
   private InterpreterResult executeSql(String propertyKey, String sql,
       InterpreterContext interpreterContext) {
-
     String paragraphId = interpreterContext.getParagraphId();
+    Connection connection;
+    Statement statement;
+    ResultSet resultSet = null;
 
     try {
+      connection = getConnection(propertyKey, interpreterContext.getAuthenticationInfo().getUser());
+      if (connection == null) {
+        return new InterpreterResult(Code.ERROR, "Prefix not found.");
+      }
 
-      Statement statement = getStatement(propertyKey, paragraphId, interpreterContext);
-
+      statement = connection.createStatement();
       if (statement == null) {
         return new InterpreterResult(Code.ERROR, "Prefix not found.");
       }
-      statement.setMaxRows(getMaxResult());
 
       StringBuilder msg = null;
       boolean isTableType = false;
@@ -364,8 +356,9 @@ public class JDBCInterpreter extends Interpreter {
         isTableType = true;
       }
 
-      ResultSet resultSet = null;
       try {
+        saveStatement(paragraphId +
+          interpreterContext.getAuthenticationInfo().getUser(), statement);
 
         boolean isResultSetAvailable = statement.execute(sql);
 
@@ -408,16 +401,24 @@ public class JDBCInterpreter extends Interpreter {
           msg.append(updateCount).append(NEWLINE);
         }
       } finally {
-        try {
-          if (resultSet != null) {
+        if (resultSet != null) {
+          try {
             resultSet.close();
-          }
-          statement.close();
-        } finally {
-          statement = null;
+          } catch (SQLException e) { /*ignored*/ }
         }
+        if (statement != null) {
+          try {
+            statement.close();
+          } catch (SQLException e) { /*ignored*/ }
+        }
+        if (connection != null) {
+          try {
+            connection.close();
+          } catch (SQLException e) { /*ignored*/ }
+        }
+        removeStatement(paragraphId +
+          interpreterContext.getAuthenticationInfo().getUser());
       }
-
       return new InterpreterResult(Code.SUCCESS, msg.toString());
 
     } catch (Exception e) {
@@ -452,7 +453,6 @@ public class JDBCInterpreter extends Interpreter {
     cmd = cmd.trim();
 
     logger.info("PropertyKey: {}, SQL command: '{}'", propertyKey, cmd);
-
     return executeSql(propertyKey, cmd, contextInterpreter);
   }
 


Mime
View raw message