calcite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [09/10] incubator-calcite git commit: [CALCITE-637] Implement Avatica CloseConnection RPC (Nick Dimiduk)
Date Sat, 28 Mar 2015 21:23:58 GMT
[CALCITE-637] Implement Avatica CloseConnection RPC (Nick Dimiduk)


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/18fea1f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/18fea1f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/18fea1f0

Branch: refs/heads/master
Commit: 18fea1f0f42a3bb68bb1053e37b1c4ee08769319
Parents: 0ad6019
Author: Nick Dimiduk <ndimiduk@gmail.com>
Authored: Tue Mar 24 16:04:28 2015 -0700
Committer: Julian Hyde <jhyde@apache.org>
Committed: Fri Mar 27 20:01:27 2015 -0700

----------------------------------------------------------------------
 avatica/pom.xml                                 |  4 ++
 .../calcite/avatica/AvaticaConnection.java      |  1 +
 .../java/org/apache/calcite/avatica/Meta.java   |  3 ++
 .../org/apache/calcite/avatica/MetaImpl.java    | 13 +++++
 .../apache/calcite/avatica/jdbc/JdbcMeta.java   | 56 ++++++++++++++++++--
 .../calcite/avatica/remote/JsonService.java     |  9 ++++
 .../calcite/avatica/remote/LocalService.java    |  6 +++
 .../calcite/avatica/remote/MockJsonService.java | 16 +++++-
 .../calcite/avatica/remote/RemoteMeta.java      |  5 ++
 .../apache/calcite/avatica/remote/Service.java  | 34 ++++++++++--
 .../calcite/avatica/test/RemoteDriverTest.java  |  4 +-
 pom.xml                                         |  5 ++
 12 files changed, 145 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/pom.xml
----------------------------------------------------------------------
diff --git a/avatica/pom.xml b/avatica/pom.xml
index 3e6040c..0e58323 100644
--- a/avatica/pom.xml
+++ b/avatica/pom.xml
@@ -41,6 +41,10 @@ limitations under the License.
       <artifactId>jackson-databind</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+    <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index edc2887..f05907f 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -154,6 +154,7 @@ public abstract class AvaticaConnection implements Connection {
       // Per specification, if onConnectionClose throws, this method will throw
       // a SQLException, but statement will still be closed.
       try {
+        meta.closeConnection(handle);
         driver.handler.onConnectionClose(this);
       } catch (RuntimeException e) {
         throw helper.createException("While closing connection", e);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index 22ea681..ecd3ee3 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -200,6 +200,9 @@ public interface Meta {
    */
   void closeStatement(StatementHandle h);
 
+  /** Close a connection */
+  void closeConnection(ConnectionHandle ch);
+
   /** Factory to create instances of {@link Meta}. */
   interface Factory {
     Meta create(List<String> args);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
index d6eca46..3ebff75 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
@@ -162,6 +162,19 @@ public abstract class MetaImpl implements Meta {
     }
   }
 
+  @Override public void closeConnection(ConnectionHandle ch) {
+    // TODO: implement
+    //
+    // lots of Calcite tests break with this simple implementation,
+    // requires investigation
+
+//    try {
+//      connection.close();
+//    } catch (SQLException e) {
+//      throw new RuntimeException(e);
+//    }
+  }
+
   public StatementHandle createStatement(ConnectionHandle ch) {
     return new StatementHandle(ch.id, connection.statementCount++, null);
   }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index f864614..24bdbda 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -20,6 +20,9 @@ import org.apache.calcite.avatica.AvaticaParameter;
 import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.avatica.Meta;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import java.lang.reflect.Array;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
@@ -43,6 +46,9 @@ import java.util.UUID;
 
 /** Implementation of {@link Meta} upon an existing JDBC data source. */
 public class JdbcMeta implements Meta {
+
+  private static final Log LOG = LogFactory.getLog(JdbcMeta.class);
+
   /**
    * JDBC Types Mapped to Java Types
    *
@@ -417,7 +423,11 @@ public class JdbcMeta implements Meta {
       final Statement statement = conn.createStatement();
       final int id = System.identityHashCode(statement);
       statementMap.put(id, new StatementInfo(statement));
-      return new StatementHandle(ch.id, id, null);
+      StatementHandle h = new StatementHandle(ch.id, id, null);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("created statement " + h);
+      }
+      return h;
     } catch (SQLException e) {
       throw propagate(e);
     }
@@ -426,11 +436,17 @@ public class JdbcMeta implements Meta {
   @Override public void closeStatement(StatementHandle h) {
     Statement stmt = statementMap.get(h.id).statement;
     if (stmt == null) {
+      LOG.debug("client requested close unknown statement " + h);
       return;
     }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("closing statement " + h);
+    }
     try {
-      assert stmt.getConnection() == connectionMap.get(h.connectionId);
+      boolean isOwned =
+          stmt.getConnection() == connectionMap.get(h.connectionId);
       stmt.close();
+      assert isOwned : "no connection found while closing " + h;
     } catch (SQLException e) {
       throw propagate(e);
     } finally {
@@ -438,6 +454,24 @@ public class JdbcMeta implements Meta {
     }
   }
 
+  @Override public void closeConnection(ConnectionHandle ch) {
+    Connection conn = connectionMap.get(ch.id);
+    if (conn == null) {
+      LOG.debug("client requested close unknown connection " + ch);
+      return;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("closing connection " + ch);
+    }
+    try {
+      conn.close();
+    } catch (SQLException e) {
+      throw propagate(e);
+    } finally {
+      connectionMap.remove(ch.id);
+    }
+  }
+
   private RuntimeException propagate(Throwable e) {
     if (e instanceof RuntimeException) {
       throw (RuntimeException) e;
@@ -455,8 +489,12 @@ public class JdbcMeta implements Meta {
       final PreparedStatement statement = conn.prepareStatement(sql);
       final int id = System.identityHashCode(statement);
       statementMap.put(id, new StatementInfo(statement));
-      return new StatementHandle(ch.id, id, signature(statement.getMetaData(),
-          statement.getParameterMetaData(), sql));
+      StatementHandle h = new StatementHandle(ch.id, id, signature(
+          statement.getMetaData(), statement.getParameterMetaData(), sql));
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("prepared statement " + h);
+      }
+      return h;
     } catch (SQLException e) {
       throw propagate(e);
     }
@@ -471,7 +509,12 @@ public class JdbcMeta implements Meta {
       final StatementInfo info = new StatementInfo(statement);
       statementMap.put(id, info);
       info.resultSet = statement.executeQuery();
-      return JdbcResultSet.create(ch.id, id, info.resultSet);
+      MetaResultSet mrs = JdbcResultSet.create(ch.id, id, info.resultSet);
+      if (LOG.isTraceEnabled()) {
+        StatementHandle h = new StatementHandle(ch.id, id, null);
+        LOG.trace("prepAndExec statement " + h);
+      }
+      return mrs;
     } catch (SQLException e) {
       throw propagate(e);
     }
@@ -479,6 +522,9 @@ public class JdbcMeta implements Meta {
 
   public Frame fetch(StatementHandle h, List<Object> parameterValues,
       int offset, int fetchMaxRowCount) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("fetching " + h + " offset:" + offset + " fetchMaxRowCount:" + fetchMaxRowCount);
+    }
     final StatementInfo statementInfo = statementMap.get(h.id);
     try {
       assert statementInfo.statement.getConnection()

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
index 8ba08cf..dc4268b 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/JsonService.java
@@ -138,6 +138,15 @@ public abstract class JsonService implements Service {
       throw handle(e);
     }
   }
+
+  @Override
+  public CloseConnectionResponse apply(CloseConnectionRequest request) {
+    try {
+      return decode(apply(encode(request)), CloseConnectionResponse.class);
+    } catch (IOException e) {
+      throw handle(e);
+    }
+  }
 }
 
 // End JsonService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index 89717b6..719ef1d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -196,6 +196,12 @@ public class LocalService implements Service {
         request.connectionId, request.statementId, null));
     return new CloseStatementResponse();
   }
+
+  @Override
+  public CloseConnectionResponse apply(CloseConnectionRequest request) {
+    meta.closeConnection(new Meta.ConnectionHandle(request.connectionId));
+    return new CloseConnectionResponse();
+  }
 }
 
 // End LocalService.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
index 19e59e3..02cb191 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
@@ -36,13 +36,27 @@ public class MockJsonService extends JsonService {
   }
 
   @Override public String apply(String request) {
-    final String response = map.get(request);
+    String response = map.get(request);
+    if (response == null) {
+      response = handleCloseConnection(request);
+    }
     if (response == null) {
       throw new RuntimeException("No response for " + request);
     }
     return response;
   }
 
+  /**
+   * Special case for closeConnection because connection IDs are random.
+   * @return response if is a CloseConnectionRequest, null otherwise.
+   */
+  private static String handleCloseConnection(String request) {
+    if (request.contains("closeConnection")) {
+      return "{\"response\":\"closeConnection\"}";
+    }
+    return null;
+  }
+
   /** Factory that creates a {@code MockJsonService}. */
   public static class Factory implements Service.Factory {
     public Service create(AvaticaConnection connection) {

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index 0a5a3a0..e72ad8b 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -64,6 +64,11 @@ class RemoteMeta extends MetaImpl {
         service.apply(new Service.CloseStatementRequest(h.connectionId, h.id));
   }
 
+  @Override public void closeConnection(ConnectionHandle ch) {
+    final Service.CloseConnectionResponse response =
+        service.apply(new Service.CloseConnectionRequest(ch.id));
+  }
+
   @Override public MetaResultSet getCatalogs() {
     final Service.ResultSetResponse response =
         service.apply(new Service.CatalogsRequest());

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index ea03348..c56524d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -40,6 +40,7 @@ public interface Service {
   FetchResponse apply(FetchRequest request);
   CreateStatementResponse apply(CreateStatementRequest request);
   CloseStatementResponse apply(CloseStatementRequest request);
+  CloseConnectionResponse apply(CloseConnectionRequest request);
 
   /** Factory that creates a {@code Service}. */
   interface Factory {
@@ -65,7 +66,9 @@ public interface Service {
       @JsonSubTypes.Type(value = CreateStatementRequest.class,
           name = "createStatement"),
       @JsonSubTypes.Type(value = CloseStatementRequest.class,
-          name = "closeStatement") })
+          name = "closeStatement"),
+      @JsonSubTypes.Type(value = CloseConnectionRequest.class,
+          name = "closeConnection") })
   abstract class Request {
     abstract Response accept(Service service);
   }
@@ -82,7 +85,9 @@ public interface Service {
       @JsonSubTypes.Type(value = CreateStatementResponse.class,
           name = "createStatement"),
       @JsonSubTypes.Type(value = CloseStatementResponse.class,
-          name = "closeStatement") })
+          name = "closeStatement"),
+      @JsonSubTypes.Type(value = CloseConnectionResponse.class,
+          name = "closeConnection") })
   abstract class Response {
   }
 
@@ -202,7 +207,7 @@ public interface Service {
   }
 
   /** Request for
-   * {@link org.apache.calcite.avatica.Meta#prepareAndExecute(org.apache.calcite.avatica.Meta.StatementHandle,
String, int, org.apache.calcite.avatica.Meta.PrepareCallback)}. */
+   * {@link org.apache.calcite.avatica.Meta#prepareAndExecute(org.apache.calcite.avatica.Meta.ConnectionHandle,
String, int, org.apache.calcite.avatica.Meta.PrepareCallback)}. */
   class PrepareAndExecuteRequest extends Request {
     public final String connectionId;
     public final String sql;
@@ -356,6 +361,29 @@ public interface Service {
     @JsonCreator
     public CloseStatementResponse() {}
   }
+
+  /** Request for
+   * {@link Meta#closeConnection(org.apache.calcite.avatica.Meta.ConnectionHandle)}. */
+  class CloseConnectionRequest extends Request {
+    public final String connectionId;
+
+    @JsonCreator
+    public CloseConnectionRequest(
+        @JsonProperty("connectionId") String connectionId) {
+      this.connectionId = connectionId;
+    }
+
+    @Override CloseConnectionResponse accept(Service service) {
+      return service.apply(this);
+    }
+  }
+
+  /** Response from
+   * {@link org.apache.calcite.avatica.remote.Service.CloseConnectionRequest}. */
+  class CloseConnectionResponse extends Response {
+    @JsonCreator
+    public CloseConnectionResponse() {}
+  }
 }
 
 // End Service.java

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
index 6713632..db4b42a 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
@@ -236,8 +236,8 @@ public class RemoteDriverTest {
         s1.handle.connectionId.equalsIgnoreCase(s2.handle.connectionId));
     conn2.close();
     conn1.close();
-//    assertEquals("closing a connection closes the server-side connection",
-//        1, connectionMap.size());
+    assertEquals("closing a connection closes the server-side connection",
+        1, connectionMap.size());
   }
 
   private void checkStatementExecuteQuery(Connection connection)

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/18fea1f0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index f870774..7d26118 100644
--- a/pom.xml
+++ b/pom.xml
@@ -191,6 +191,11 @@ limitations under the License.
         <version>3.2</version>
       </dependency>
       <dependency>
+        <groupId>commons-logging</groupId>
+        <artifactId>commons-logging</artifactId>
+        <version>1.1.3</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.janino</groupId>
         <artifactId>janino</artifactId>
         <version>2.7.6</version>


Mime
View raw message