calcite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jh...@apache.org
Subject [08/10] incubator-calcite git commit: [CALCITE-636] Connection isolation for Avatica clients (Nick Dimiduk)
Date Sat, 28 Mar 2015 21:23:57 GMT
[CALCITE-636] Connection isolation for Avatica clients (Nick Dimiduk)


Project: http://git-wip-us.apache.org/repos/asf/incubator-calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-calcite/commit/0ad6019b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-calcite/tree/0ad6019b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-calcite/diff/0ad6019b

Branch: refs/heads/master
Commit: 0ad6019bc9efa9e6ecc36647d27e258aad3f4eaa
Parents: 208eda6
Author: Nick Dimiduk <ndimiduk@gmail.com>
Authored: Tue Mar 24 15:24:45 2015 -0700
Committer: Julian Hyde <jhyde@apache.org>
Committed: Fri Mar 27 20:01:27 2015 -0700

----------------------------------------------------------------------
 .../calcite/avatica/AvaticaConnection.java      |  24 ++--
 .../calcite/avatica/AvaticaStatement.java       |   8 +-
 .../java/org/apache/calcite/avatica/Meta.java   |  36 ++++--
 .../org/apache/calcite/avatica/MetaImpl.java    |   5 +-
 .../calcite/avatica/UnregisteredDriver.java     |   2 +-
 .../apache/calcite/avatica/jdbc/JdbcMeta.java   | 120 ++++++++++++++-----
 .../calcite/avatica/jdbc/JdbcResultSet.java     |  13 +-
 .../calcite/avatica/remote/LocalService.java    |  44 ++++---
 .../calcite/avatica/remote/RemoteMeta.java      |  27 +++--
 .../apache/calcite/avatica/remote/Service.java  |  54 ++++++---
 .../calcite/avatica/test/RemoteDriverTest.java  | 101 +++++++++++++---
 .../apache/calcite/jdbc/CalciteMetaImpl.java    |  17 ++-
 12 files changed, 314 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
index 293cc6c..edc2887 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
 import java.util.TimeZone;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
@@ -57,7 +58,8 @@ public abstract class AvaticaConnection implements Connection {
   private int networkTimeout;
   private String catalog;
 
-  public final int id;
+  public final String id;
+  public final Meta.ConnectionHandle handle;
   protected final UnregisteredDriver driver;
   protected final AvaticaFactory factory;
   final String url;
@@ -70,8 +72,6 @@ public abstract class AvaticaConnection implements Connection {
   public final Map<Integer, AvaticaStatement> statementMap =
       new ConcurrentHashMap<>();
 
-  private static int nextId;
-
   /**
    * Creates an AvaticaConnection.
    *
@@ -87,7 +87,8 @@ public abstract class AvaticaConnection implements Connection {
       AvaticaFactory factory,
       String url,
       Properties info) {
-    this.id = nextId++;
+    this.id = UUID.randomUUID().toString();
+    this.handle = new Meta.ConnectionHandle(this.id);
     this.driver = driver;
     this.factory = factory;
     this.url = url;
@@ -272,12 +273,9 @@ public abstract class AvaticaConnection implements Connection {
       int resultSetConcurrency,
       int resultSetHoldability) throws SQLException {
     try {
-      // TODO: cut out round-trip to create a statement handle
       final Meta.ConnectionHandle ch = new Meta.ConnectionHandle(id);
-      final Meta.StatementHandle h = meta.createStatement(ch);
-
-      final Meta.Signature x = meta.prepare(h, sql, -1);
-      return factory.newPreparedStatement(this, h, x, resultSetType,
+      final Meta.StatementHandle h = meta.prepare(ch, sql, -1);
+      return factory.newPreparedStatement(this, h, h.signature, resultSetType,
           resultSetConcurrency, resultSetHoldability);
     } catch (RuntimeException e) {
       throw helper.createException("while preparing SQL: " + sql, e);
@@ -440,8 +438,8 @@ public abstract class AvaticaConnection implements Connection {
   protected ResultSet prepareAndExecuteInternal(
       final AvaticaStatement statement, String sql, int maxRowCount)
       throws SQLException {
-    Meta.MetaResultSet x = meta.prepareAndExecute(statement.handle, sql,
-        maxRowCount, new Meta.PrepareCallback() {
+    Meta.MetaResultSet x = meta.prepareAndExecute(handle, sql, maxRowCount,
+        new Meta.PrepareCallback() {
           public Object getMonitor() {
             return statement;
           }
@@ -477,8 +475,8 @@ public abstract class AvaticaConnection implements Connection {
 
   protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet)
       throws SQLException {
-    final Meta.StatementHandle h =
-        new Meta.StatementHandle(metaResultSet.statementId);
+    final Meta.StatementHandle h = new Meta.StatementHandle(
+        metaResultSet.connectionId, metaResultSet.statementId, null);
     final AvaticaStatement statement = lookupStatement(h);
     return executeQueryInternal(statement, metaResultSet.signature.sanitize(),
         metaResultSet.firstFrame);

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
index 2ad74bf..8276b07 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
@@ -92,8 +92,12 @@ public abstract class AvaticaStatement
     try {
       // In JDBC, maxRowCount = 0 means no limit; in prepare it means LIMIT 0
       final int maxRowCount1 = maxRowCount <= 0 ? -1 : maxRowCount;
-      Meta.Signature x = connection.meta.prepare(handle, sql, maxRowCount1);
-      return executeInternal(x);
+      ResultSet resultSet =
+          connection.prepareAndExecuteInternal(this, sql, maxRowCount1);
+      if (resultSet.isClosed()) {
+        return false;
+      }
+      return true;
     } catch (RuntimeException e) {
       throw connection.helper.createException("while executing SQL: " + sql, e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index cd0166c..22ea681 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -153,22 +153,22 @@ public interface Meta {
 
   /** Prepares a statement.
    *
-   * @param h Statement handle
+   * @param ch Connection handle
    * @param sql SQL query
    * @param maxRowCount Negative for no limit (different meaning than JDBC)
    * @return Signature of prepared statement
    */
-  Signature prepare(StatementHandle h, String sql, int maxRowCount);
+  StatementHandle prepare(ConnectionHandle ch, String sql, int maxRowCount);
 
   /** Prepares and executes a statement.
    *
-   * @param h Statement handle
+   * @param ch Connection handle
    * @param sql SQL query
    * @param maxRowCount Negative for no limit (different meaning than JDBC)
    * @param callback Callback to lock, clear and assign cursor
-   * @return Signature of prepared statement
+   * @return MetaResultSet containing statement ID and first frame of data
    */
-  MetaResultSet prepareAndExecute(StatementHandle h, String sql,
+  MetaResultSet prepareAndExecute(ConnectionHandle ch, String sql,
       int maxRowCount, PrepareCallback callback);
 
   /** Returns a frame of rows.
@@ -223,14 +223,16 @@ public interface Meta {
 
   /** Meta data from which a result set can be constructed. */
   class MetaResultSet {
+    public final String connectionId;
     public final int statementId;
     public final boolean ownStatement;
     public final Frame firstFrame;
     public final Signature signature;
 
-    public MetaResultSet(int statementId, boolean ownStatement,
-        Signature signature, Frame firstFrame) {
+    public MetaResultSet(String connectionId, int statementId,
+        boolean ownStatement, Signature signature, Frame firstFrame) {
       this.signature = Objects.requireNonNull(signature);
+      this.connectionId = connectionId;
       this.statementId = statementId;
       this.ownStatement = ownStatement;
       this.firstFrame = firstFrame; // may be null
@@ -430,29 +432,39 @@ public interface Meta {
 
   /** Connection handle. */
   class ConnectionHandle {
-    public final int id;
+    public final String id;
 
     @Override public String toString() {
-      return Integer.toString(id);
+      return id;
     }
 
     @JsonCreator
-    public ConnectionHandle(@JsonProperty("id") int id) {
+    public ConnectionHandle(@JsonProperty("id") String id) {
       this.id = id;
     }
   }
 
   /** Statement handle. */
   class StatementHandle {
+    public final String connectionId;
     public final int id;
 
+    // not final because LocalService#apply(PrepareRequest)
+    /** Only present for PreparedStatement handles, null otherwise. */
+    public Signature signature;
+
     @Override public String toString() {
-      return Integer.toString(id);
+      return connectionId + "::" + Integer.toString(id);
     }
 
     @JsonCreator
-    public StatementHandle(@JsonProperty("id") int id) {
+    public StatementHandle(
+        @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("id") int id,
+        @JsonProperty("signature") Signature signature) {
+      this.connectionId = connectionId;
       this.id = id;
+      this.signature = signature;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
index a6c5f3c..d6eca46 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
@@ -163,7 +163,7 @@ public abstract class MetaImpl implements Meta {
   }
 
   public StatementHandle createStatement(ConnectionHandle ch) {
-    return new StatementHandle(connection.statementCount++);
+    return new StatementHandle(ch.id, connection.statementCount++, null);
   }
 
   /** Creates an empty result set. Useful for JDBC metadata methods that are
@@ -215,7 +215,8 @@ public abstract class MetaImpl implements Meta {
       final Signature signature =
           new Signature(columns, "", Collections.<AvaticaParameter>emptyList(),
               internalParameters, cursorFactory);
-      return new MetaResultSet(statement.getId(), true, signature, firstFrame);
+      return new MetaResultSet(connection.id, statement.getId(), true,
+          signature, firstFrame);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
index fefd5f4..5f8d492 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
@@ -42,7 +42,7 @@ import java.util.logging.Logger;
  *
  * <p>The provider must implement:</p>
  * <ul>
- *   <li>{@link Meta#prepare(org.apache.calcite.avatica.Meta.StatementHandle, String, int)}
+ *   <li>{@link Meta#prepare(Meta.ConnectionHandle, String, int)}
  *   <li>{@link Meta#createIterable(org.apache.calcite.avatica.Meta.StatementHandle, org.apache.calcite.avatica.Meta.Signature, java.util.List, Meta.Frame)}
  * </ul>
  */

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
index 72c3948..f864614 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Array;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
 import java.sql.Connection;
+import java.sql.DriverManager;
 import java.sql.ParameterMetaData;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
@@ -37,6 +38,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
 
 /** Implementation of {@link Meta} upon an existing JDBC data source. */
 public class JdbcMeta implements Meta {
@@ -72,6 +75,13 @@ public class JdbcMeta implements Meta {
     SQL_TYPE_TO_JAVA_TYPE.put(Types.ARRAY, Array.class);
   }
 
+  private static final String DEFAULT_CONN_ID =
+      UUID.fromString("00000000-0000-0000-0000-000000000000").toString();
+
+  private final String url;
+  private final Properties info;
+  private final Connection connection; // TODO: remove default connection
+  private final Map<String, Connection> connectionMap = new HashMap<>();
   private final Map<Integer, StatementInfo> statementMap = new HashMap<>();
 
   /**
@@ -124,7 +134,7 @@ public class JdbcMeta implements Meta {
   protected static Signature signature(ResultSetMetaData metaData,
       ParameterMetaData parameterMetaData, String sql) throws  SQLException {
     return new Signature(columns(metaData), sql, parameters(parameterMetaData),
-        null, CursorFactory.ARRAY);
+        null, CursorFactory.LIST /* LIST because JdbcResultSet#frame */);
   }
 
   protected static Signature signature(ResultSetMetaData metaData)
@@ -132,10 +142,43 @@ public class JdbcMeta implements Meta {
     return signature(metaData, null, null);
   }
 
-  protected final Connection connection;
+  /**
+   * @param url a database url of the form
+   *  <code> jdbc:<em>subprotocol</em>:<em>subname</em></code>
+   */
+  public JdbcMeta(String url) throws SQLException {
+    this(url, new Properties());
+  }
 
-  public JdbcMeta(Connection connection) {
-    this.connection = connection;
+  /**
+   * @param url a database url of the form
+   * <code>jdbc:<em>subprotocol</em>:<em>subname</em></code>
+   * @param user the database user on whose behalf the connection is being
+   *   made
+   * @param password the user's password
+   */
+  public JdbcMeta(final String url, final String user, final String password)
+      throws SQLException {
+    this(url, new Properties() {
+      {
+        put("user", user);
+        put("password", password);
+      }
+    });
+  }
+
+  /**
+   * @param url a database url of the form
+   * <code> jdbc:<em>subprotocol</em>:<em>subname</em></code>
+   * @param info a list of arbitrary string tag/value pairs as
+   * connection arguments; normally at least a "user" and
+   * "password" property should be included
+   */
+  public JdbcMeta(String url, Properties info) throws SQLException {
+    this.url = url;
+    this.info = info;
+    this.connection = DriverManager.getConnection(url, info);
+    this.connectionMap.put(DEFAULT_CONN_ID, connection);
   }
 
   public String getSqlKeywords() {
@@ -170,7 +213,7 @@ public class JdbcMeta implements Meta {
       Pat tableNamePattern, List<String> typeList) {
     try {
       String[] types = new String[typeList == null ? 0 : typeList.size()];
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getTables(catalog, schemaPattern.s,
               tableNamePattern.s,
               typeList == null ? types : typeList.toArray(types)));
@@ -182,7 +225,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getColumns(String catalog, Pat schemaPattern,
       Pat tableNamePattern, Pat columnNamePattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getColumns(catalog, schemaPattern.s,
               tableNamePattern.s, columnNamePattern.s));
     } catch (SQLException e) {
@@ -192,7 +235,7 @@ public class JdbcMeta implements Meta {
 
   public MetaResultSet getSchemas(String catalog, Pat schemaPattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getSchemas(catalog, schemaPattern.s));
     } catch (SQLException e) {
       throw new RuntimeException(e);
@@ -201,7 +244,8 @@ public class JdbcMeta implements Meta {
 
   public MetaResultSet getCatalogs() {
     try {
-      return JdbcResultSet.create(connection.getMetaData().getCatalogs());
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
+          connection.getMetaData().getCatalogs());
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -209,7 +253,8 @@ public class JdbcMeta implements Meta {
 
   public MetaResultSet getTableTypes() {
     try {
-      return JdbcResultSet.create(connection.getMetaData().getTableTypes());
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
+          connection.getMetaData().getTableTypes());
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -218,7 +263,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getProcedures(String catalog, Pat schemaPattern,
       Pat procedureNamePattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getProcedures(catalog, schemaPattern.s,
               procedureNamePattern.s));
     } catch (SQLException e) {
@@ -229,7 +274,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getProcedureColumns(String catalog, Pat schemaPattern,
       Pat procedureNamePattern, Pat columnNamePattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getProcedureColumns(catalog,
               schemaPattern.s, procedureNamePattern.s, columnNamePattern.s));
     } catch (SQLException e) {
@@ -240,7 +285,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getColumnPrivileges(String catalog, String schema,
       String table, Pat columnNamePattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getColumnPrivileges(catalog, schema,
               table, columnNamePattern.s));
     } catch (SQLException e) {
@@ -251,7 +296,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getTablePrivileges(String catalog, Pat schemaPattern,
       Pat tableNamePattern) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getTablePrivileges(catalog,
               schemaPattern.s, tableNamePattern.s));
     } catch (SQLException e) {
@@ -262,7 +307,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getBestRowIdentifier(String catalog, String schema,
       String table, int scope, boolean nullable) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getBestRowIdentifier(catalog, schema,
               table, scope, nullable));
     } catch (SQLException e) {
@@ -273,7 +318,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getVersionColumns(String catalog, String schema,
       String table) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getVersionColumns(catalog, schema, table));
     } catch (SQLException e) {
       throw new RuntimeException(e);
@@ -283,7 +328,7 @@ public class JdbcMeta implements Meta {
   public MetaResultSet getPrimaryKeys(String catalog, String schema,
       String table) {
     try {
-      return JdbcResultSet.create(
+      return JdbcResultSet.create(DEFAULT_CONN_ID, -1,
           connection.getMetaData().getPrimaryKeys(catalog, schema, table));
     } catch (SQLException e) {
       throw new RuntimeException(e);
@@ -359,12 +404,20 @@ public class JdbcMeta implements Meta {
     return null;
   }
 
+  protected Connection getConnection(String id) throws SQLException {
+    if (connectionMap.get(id) == null) {
+      connectionMap.put(id, DriverManager.getConnection(url, info));
+    }
+    return connectionMap.get(id);
+  }
+
   public StatementHandle createStatement(ConnectionHandle ch) {
     try {
-      final Statement statement = connection.createStatement();
-      final int id = statementMap.size();
+      final Connection conn = getConnection(ch.id);
+      final Statement statement = conn.createStatement();
+      final int id = System.identityHashCode(statement);
       statementMap.put(id, new StatementInfo(statement));
-      return new StatementHandle(id);
+      return new StatementHandle(ch.id, id, null);
     } catch (SQLException e) {
       throw propagate(e);
     }
@@ -376,6 +429,7 @@ public class JdbcMeta implements Meta {
       return;
     }
     try {
+      assert stmt.getConnection() == connectionMap.get(h.connectionId);
       stmt.close();
     } catch (SQLException e) {
       throw propagate(e);
@@ -394,24 +448,30 @@ public class JdbcMeta implements Meta {
     }
   }
 
-  public Signature prepare(StatementHandle h, String sql, int maxRowCount) {
-    // TODO: can't actually prepare an existing statement...
+  public StatementHandle prepare(ConnectionHandle ch, String sql,
+      int maxRowCount) {
     try {
-      PreparedStatement statement = connection.prepareStatement(sql);
-      statementMap.put(h.id, new StatementInfo(statement));
-      return signature(statement.getMetaData(),
-          statement.getParameterMetaData(), sql);
+      final Connection conn = getConnection(ch.id);
+      final PreparedStatement statement = conn.prepareStatement(sql);
+      final int id = System.identityHashCode(statement);
+      statementMap.put(id, new StatementInfo(statement));
+      return new StatementHandle(ch.id, id, signature(statement.getMetaData(),
+          statement.getParameterMetaData(), sql));
     } catch (SQLException e) {
       throw propagate(e);
     }
   }
 
-  public MetaResultSet prepareAndExecute(StatementHandle h, String sql,
+  public MetaResultSet prepareAndExecute(ConnectionHandle ch, String sql,
       int maxRowCount, PrepareCallback callback) {
-    final StatementInfo statementInfo = statementMap.get(h.id);
     try {
-      statementInfo.resultSet = statementInfo.statement.executeQuery(sql);
-      return JdbcResultSet.create(statementInfo.resultSet);
+      final Connection connection = getConnection(ch.id);
+      final PreparedStatement statement = connection.prepareStatement(sql);
+      final int id = System.identityHashCode(statement);
+      final StatementInfo info = new StatementInfo(statement);
+      statementMap.put(id, info);
+      info.resultSet = statement.executeQuery();
+      return JdbcResultSet.create(ch.id, id, info.resultSet);
     } catch (SQLException e) {
       throw propagate(e);
     }
@@ -421,6 +481,8 @@ public class JdbcMeta implements Meta {
       int offset, int fetchMaxRowCount) {
     final StatementInfo statementInfo = statementMap.get(h.id);
     try {
+      assert statementInfo.statement.getConnection()
+          == connectionMap.get(h.connectionId);
       if (statementInfo.resultSet == null || parameterValues != null) {
         if (statementInfo.resultSet != null) {
           statementInfo.resultSet.close();

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
index ac4aeb5..827f31d 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java
@@ -30,19 +30,20 @@ import java.util.List;
  *
  *  @see org.apache.calcite.avatica.jdbc.JdbcMeta */
 class JdbcResultSet extends Meta.MetaResultSet {
-  protected JdbcResultSet(int statementId, boolean ownStatement,
-      Meta.Signature signature, Meta.Frame firstFrame) {
-    super(statementId, ownStatement, signature, firstFrame);
+  protected JdbcResultSet(String connectionId, int statementId,
+      boolean ownStatement, Meta.Signature signature, Meta.Frame firstFrame) {
+    super(connectionId, statementId, ownStatement, signature, firstFrame);
   }
 
   /** Creates a result set. */
-  public static JdbcResultSet create(ResultSet resultSet) {
+  public static JdbcResultSet create(String connectionId, int statementId,
+      ResultSet resultSet) {
     try {
-      int id = resultSet.getStatement().hashCode();
       Meta.Signature sig = JdbcMeta.signature(resultSet.getMetaData());
       final Meta.Frame firstFrame = frame(resultSet, 0, -1);
       resultSet.close();
-      return new JdbcResultSet(id, true, sig, firstFrame);
+      return new JdbcResultSet(connectionId, statementId, true, sig,
+          firstFrame);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
index e302b85..89717b6 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/LocalService.java
@@ -59,6 +59,7 @@ public class LocalService implements Service {
         cursorFactory = Meta.CursorFactory.LIST;
         break;
       case MAP:
+      case LIST:
         break;
       default:
         cursorFactory = Meta.CursorFactory.map(cursorFactory.fieldNames);
@@ -72,13 +73,13 @@ public class LocalService implements Service {
     if (cursorFactory != resultSet.signature.cursorFactory) {
       signature = signature.setCursorFactory(cursorFactory);
     }
-    return new ResultSetResponse(resultSet.statementId, resultSet.ownStatement,
-        signature, new Meta.Frame(0, true, list));
+    return new ResultSetResponse(resultSet.connectionId, resultSet.statementId,
+        resultSet.ownStatement, signature, new Meta.Frame(0, true, list));
   }
 
   private List<List<Object>> list2(Meta.MetaResultSet resultSet) {
-    final Meta.StatementHandle h =
-        new Meta.StatementHandle(resultSet.statementId);
+    final Meta.StatementHandle h = new Meta.StatementHandle(
+        resultSet.connectionId, resultSet.statementId, null);
     final Iterable<Object> iterable = meta.createIterable(h,
         resultSet.signature, Collections.emptyList(), resultSet.firstFrame);
     final List<List<Object>> list = new ArrayList<>();
@@ -118,12 +119,12 @@ public class LocalService implements Service {
   }
 
   public PrepareResponse apply(PrepareRequest request) {
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
     final Meta.StatementHandle h =
-        new Meta.StatementHandle(request.statementId);
-    Meta.Signature signature =
-        meta.prepare(h, request.sql, request.maxRowCount)
-            .setCursorFactory(Meta.CursorFactory.LIST);
+        meta.prepare(ch, request.sql, request.maxRowCount);
     if (json) {
+      Meta.Signature signature = h.signature;
       final List<ColumnMetaData> columns = new ArrayList<>();
       for (ColumnMetaData column : signature.columns) {
         switch (column.type.rep) {
@@ -146,33 +147,37 @@ public class LocalService implements Service {
       signature = new Meta.Signature(columns, signature.sql,
           signature.parameters, signature.internalParameters,
           signature.cursorFactory);
+      h.signature = signature;
     }
-    return new PrepareResponse(signature);
+    return new PrepareResponse(h);
   }
 
   public ResultSetResponse apply(PrepareAndExecuteRequest request) {
-    final Meta.StatementHandle h =
-        new Meta.StatementHandle(request.statementId);
+    final Meta.ConnectionHandle ch =
+        new Meta.ConnectionHandle(request.connectionId);
     final Meta.MetaResultSet resultSet =
-        meta.prepareAndExecute(h, request.sql, request.maxRowCount,
+        meta.prepareAndExecute(ch, request.sql, request.maxRowCount,
             new Meta.PrepareCallback() {
               @Override public Object getMonitor() {
                 return LocalService.class;
               }
 
-              @Override public void clear() {}
+              @Override public void clear() {
+              }
 
               @Override public void assign(Meta.Signature signature,
-                  Meta.Frame firstFrame) {}
+                  Meta.Frame firstFrame) {
+              }
 
-              @Override public void execute() {}
+              @Override public void execute() {
+              }
             });
     return toResponse(resultSet);
   }
 
   public FetchResponse apply(FetchRequest request) {
-    final Meta.StatementHandle h =
-        new Meta.StatementHandle(request.statementId);
+    final Meta.StatementHandle h = new Meta.StatementHandle(
+        request.connectionId, request.statementId, null);
     final Meta.Frame frame =
         meta.fetch(h, request.parameterValues, request.offset,
             request.fetchMaxRowCount);
@@ -182,12 +187,13 @@ public class LocalService implements Service {
   public CreateStatementResponse apply(CreateStatementRequest request) {
     final Meta.StatementHandle h =
         meta.createStatement(new Meta.ConnectionHandle(request.connectionId));
-    return new CreateStatementResponse(h.id);
+    return new CreateStatementResponse(h.connectionId, h.id);
   }
 
   @Override
   public CloseStatementResponse apply(CloseStatementRequest request) {
-    meta.closeStatement(new Meta.StatementHandle(request.statementId));
+    meta.closeStatement(new Meta.StatementHandle(
+        request.connectionId, request.statementId, null));
     return new CloseStatementResponse();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index e8bbfb5..0a5a3a0 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -48,19 +48,20 @@ class RemoteMeta extends MetaImpl {
       signature0 = Signature.create(columns,
           "?", Collections.<AvaticaParameter>emptyList(), CursorFactory.ARRAY);
     }
-    return new MetaResultSet(response.statementId, response.ownStatement,
-        signature0, response.firstFrame);
+    return new MetaResultSet(response.connectionId, response.statementId,
+        response.ownStatement, signature0, response.firstFrame);
   }
 
   @Override public StatementHandle createStatement(ConnectionHandle ch) {
     final Service.CreateStatementResponse response =
         service.apply(new Service.CreateStatementRequest(ch.id));
-    return new StatementHandle(response.id);
+    return new StatementHandle(response.connectionId, response.statementId,
+        null);
   }
 
   @Override public void closeStatement(StatementHandle h) {
     final Service.CloseStatementResponse response =
-        service.apply(new Service.CloseStatementRequest(h.id));
+        service.apply(new Service.CloseStatementRequest(h.connectionId, h.id));
   }
 
   @Override public MetaResultSet getCatalogs() {
@@ -99,21 +100,21 @@ class RemoteMeta extends MetaImpl {
     return toResultSet(MetaColumn.class, response);
   }
 
-  @Override public Signature prepare(StatementHandle h, String sql,
+  @Override public StatementHandle prepare(ConnectionHandle ch, String sql,
       int maxRowCount) {
-    final Service.PrepareResponse response =
-        service.apply(new Service.PrepareRequest(h.id, sql, maxRowCount));
-    return response.signature;
+    final Service.PrepareResponse response = service.apply(
+        new Service.PrepareRequest(ch.id, sql, maxRowCount));
+    return response.statement;
   }
 
-  @Override public MetaResultSet prepareAndExecute(StatementHandle h,
+  @Override public MetaResultSet prepareAndExecute(ConnectionHandle ch,
       String sql, int maxRowCount, PrepareCallback callback) {
     final Service.ResultSetResponse response;
     try {
       synchronized (callback.getMonitor()) {
         callback.clear();
-        response = service.apply(
-            new Service.PrepareAndExecuteRequest(h.id, sql, maxRowCount));
+        response = service.apply(new Service.PrepareAndExecuteRequest(
+            ch.id, sql, maxRowCount));
         callback.assign(response.signature, response.firstFrame);
       }
       callback.execute();
@@ -127,8 +128,8 @@ class RemoteMeta extends MetaImpl {
       int offset, int fetchMaxRowCount) {
     final Service.FetchResponse response =
         service.apply(
-            new Service.FetchRequest(h.id, parameterValues, offset,
-                fetchMaxRowCount));
+            new Service.FetchRequest(h.connectionId, h.id, parameterValues,
+                offset, fetchMaxRowCount));
     return response.frame;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index bbe3552..ea03348 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -180,16 +180,20 @@ public interface Service {
    * {@link Meta#getTableTypes()}
    * return this response. */
   class ResultSetResponse extends Response {
+    public final String connectionId;
     public final int statementId;
     public final boolean ownStatement;
     public final Meta.Signature signature;
     public final Meta.Frame firstFrame;
 
     @JsonCreator
-    public ResultSetResponse(@JsonProperty("statementId") int statementId,
+    public ResultSetResponse(
+        @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("statementId") int statementId,
         @JsonProperty("ownStatement") boolean ownStatement,
         @JsonProperty("signature") Meta.Signature signature,
         @JsonProperty("firstFrame") Meta.Frame firstFrame) {
+      this.connectionId = connectionId;
       this.statementId = statementId;
       this.ownStatement = ownStatement;
       this.signature = signature;
@@ -200,16 +204,16 @@ public interface Service {
   /** Request for
    * {@link org.apache.calcite.avatica.Meta#prepareAndExecute(org.apache.calcite.avatica.Meta.StatementHandle, String, int, org.apache.calcite.avatica.Meta.PrepareCallback)}. */
   class PrepareAndExecuteRequest extends Request {
-    public final int statementId;
+    public final String connectionId;
     public final String sql;
     public final int maxRowCount;
 
     @JsonCreator
     public PrepareAndExecuteRequest(
-        @JsonProperty("statementId") int statementId,
+        @JsonProperty("connectionId") String connectionId,
         @JsonProperty("sql") String sql,
         @JsonProperty("maxRowCount") int maxRowCount) {
-      this.statementId = statementId;
+      this.connectionId = connectionId;
       this.sql = sql;
       this.maxRowCount = maxRowCount;
     }
@@ -220,17 +224,18 @@ public interface Service {
   }
 
   /** Request for
-   * {@link org.apache.calcite.avatica.Meta#prepare(org.apache.calcite.avatica.Meta.StatementHandle, String, int)}. */
+   * {@link org.apache.calcite.avatica.Meta#prepare(org.apache.calcite.avatica.Meta.ConnectionHandle, String, int)}. */
   class PrepareRequest extends Request {
-    public final int statementId;
+    public final String connectionId;
     public final String sql;
     public final int maxRowCount;
 
     @JsonCreator
-    public PrepareRequest(@JsonProperty("statementId") int statementId,
+    public PrepareRequest(
+        @JsonProperty("connectionId") String connectionId,
         @JsonProperty("sql") String sql,
         @JsonProperty("maxRowCount") int maxRowCount) {
-      this.statementId = statementId;
+      this.connectionId = connectionId;
       this.sql = sql;
       this.maxRowCount = maxRowCount;
     }
@@ -243,18 +248,19 @@ public interface Service {
   /** Response from
    * {@link org.apache.calcite.avatica.remote.Service.PrepareRequest}. */
   class PrepareResponse extends Response {
-    public final Meta.Signature signature;
+    public final Meta.StatementHandle statement;
 
     @JsonCreator
     public PrepareResponse(
-        @JsonProperty("signature") Meta.Signature signature) {
-      this.signature = signature;
+        @JsonProperty("statement") Meta.StatementHandle statement) {
+      this.statement = statement;
     }
   }
 
   /** Request for
    * {@link org.apache.calcite.avatica.Meta#fetch(Meta.StatementHandle, List, int, int)}. */
   class FetchRequest extends Request {
+    public final String connectionId;
     public final int statementId;
     public final int offset;
     /** Maximum number of rows to be returned in the frame. Negative means no
@@ -265,10 +271,13 @@ public interface Service {
     public final List<Object> parameterValues;
 
     @JsonCreator
-    public FetchRequest(@JsonProperty("statementId") int statementId,
+    public FetchRequest(
+        @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("statementId") int statementId,
         @JsonProperty("parameterValues") List<Object> parameterValues,
         @JsonProperty("offset") int offset,
         @JsonProperty("fetchMaxRowCount") int fetchMaxRowCount) {
+      this.connectionId = connectionId;
       this.statementId = statementId;
       this.parameterValues = parameterValues;
       this.offset = offset;
@@ -294,10 +303,11 @@ public interface Service {
   /** Request for
    * {@link org.apache.calcite.avatica.Meta#createStatement(org.apache.calcite.avatica.Meta.ConnectionHandle)}. */
   class CreateStatementRequest extends Request {
-    public final int connectionId;
+    public final String connectionId;
 
     @JsonCreator
-    public CreateStatementRequest(@JsonProperty("signature") int connectionId) {
+    public CreateStatementRequest(
+        @JsonProperty("signature") String connectionId) {
       this.connectionId = connectionId;
     }
 
@@ -309,21 +319,29 @@ public interface Service {
   /** Response from
    * {@link org.apache.calcite.avatica.remote.Service.CreateStatementRequest}. */
   class CreateStatementResponse extends Response {
-    public final int id;
+    public final String connectionId;
+    public final int statementId;
 
     @JsonCreator
-    public CreateStatementResponse(@JsonProperty("id") int id) {
-      this.id = id;
+    public CreateStatementResponse(
+        @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("statementId") int statementId) {
+      this.connectionId = connectionId;
+      this.statementId = statementId;
     }
   }
 
   /** Request for
    * {@link org.apache.calcite.avatica.Meta#closeStatement(org.apache.calcite.avatica.Meta.StatementHandle)}. */
   class CloseStatementRequest extends Request {
+    public final String connectionId;
     public final int statementId;
 
     @JsonCreator
-    public CloseStatementRequest(@JsonProperty("id") int statementId) {
+    public CloseStatementRequest(
+        @JsonProperty("connectionId") String connectionId,
+        @JsonProperty("statementId") int statementId) {
+      this.connectionId = connectionId;
       this.statementId = statementId;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
----------------------------------------------------------------------
diff --git a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
index 9c03b3b..6713632 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/RemoteDriverTest.java
@@ -17,6 +17,7 @@
 package org.apache.calcite.avatica.test;
 
 import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaPreparedStatement;
 import org.apache.calcite.avatica.AvaticaStatement;
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.jdbc.JdbcMeta;
@@ -27,6 +28,7 @@ import org.apache.calcite.avatica.remote.Service;
 
 import net.hydromatic.scott.data.hsqldb.ScottHsqldb;
 
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -72,6 +74,11 @@ public class RemoteDriverTest {
     return DriverManager.getConnection("jdbc:avatica:remote:factory=" + QRJS);
   }
 
+  @Before
+  public void before() throws Exception {
+    QuasiRemoteJdbcServiceFactory.initService();
+  }
+
   @Test public void testRegister() throws Exception {
     final Connection connection =
         DriverManager.getConnection("jdbc:avatica:remote:");
@@ -194,7 +201,7 @@ public class RemoteDriverTest {
     try (AvaticaConnection connection = (AvaticaConnection) ljs()) {
       Map<Integer, AvaticaStatement> clientMap = connection.statementMap;
       Map<Integer, Statement> serverMap =
-          QuasiRemoteJdbcServiceFactory.getRemoteMap(connection);
+          QuasiRemoteJdbcServiceFactory.getRemoteStatementMap(connection);
       assertEquals(0, clientMap.size());
       assertEquals(0, serverMap.size());
       Statement stmt = connection.createStatement();
@@ -206,6 +213,33 @@ public class RemoteDriverTest {
     }
   }
 
+  @Test public void testConnectionIsolation() throws Exception {
+    final String sql = "select * from (values (1, 'a'))";
+    Connection conn1 = ljs();
+    Connection conn2 = ljs();
+    Map<String, Connection> connectionMap =
+        QuasiRemoteJdbcServiceFactory.getRemoteConnectionMap(
+            (AvaticaConnection) conn1);
+    assertEquals("should contain at least the default connection",
+        1, connectionMap.size());
+    PreparedStatement conn1stmt1 = conn1.prepareStatement(sql);
+    assertEquals(
+        "statement creation implicitly creates a connection server-side",
+        2, connectionMap.size());
+    PreparedStatement conn2stmt1 = conn2.prepareStatement(sql);
+    assertEquals(
+        "statement creation implicitly creates a connection server-side",
+        3, connectionMap.size());
+    AvaticaPreparedStatement s1 = (AvaticaPreparedStatement) conn1stmt1;
+    AvaticaPreparedStatement s2 = (AvaticaPreparedStatement) conn2stmt1;
+    assertFalse("connection id's should be unique",
+        s1.handle.connectionId.equalsIgnoreCase(s2.handle.connectionId));
+    conn2.close();
+    conn1.close();
+//    assertEquals("closing a connection closes the server-side connection",
+//        1, connectionMap.size());
+  }
+
   private void checkStatementExecuteQuery(Connection connection)
       throws SQLException {
     final Statement statement = connection.createStatement();
@@ -278,42 +312,51 @@ public class RemoteDriverTest {
     connection.close();
   }
 
-  /** Factory that creates a service based on a local JDBC connection. */
+  /**
+   * Factory that creates a service based on a local JDBC connection.
+   */
   public static class LocalJdbcServiceFactory implements Service.Factory {
     @Override public Service create(AvaticaConnection connection) {
       try {
-        Connection connection1 =
-            DriverManager.getConnection(CONNECTION_SPEC.url,
-                CONNECTION_SPEC.username, CONNECTION_SPEC.password);
-        return new LocalService(new JdbcMeta(connection1));
+        return new LocalService(new JdbcMeta(CONNECTION_SPEC.url,
+            CONNECTION_SPEC.username, CONNECTION_SPEC.password));
       } catch (SQLException e) {
         throw new RuntimeException(e);
       }
     }
   }
 
-  /** Factory that creates a service based on a local JDBC connection. */
+  /**
+   * Factory that creates a service based on a local JDBC connection.
+   */
   public static class QuasiRemoteJdbcServiceFactory implements Service.Factory {
-    @Override public Service create(AvaticaConnection connection) {
+
+    /** a singleton instance that is recreated for each test */
+    private static Service service;
+
+    static void initService() {
       try {
-        Connection connection1 =
-            DriverManager.getConnection(CONNECTION_SPEC.url,
-                CONNECTION_SPEC.username, CONNECTION_SPEC.password);
-        final JdbcMeta jdbcMeta = new JdbcMeta(connection1);
+        final JdbcMeta jdbcMeta = new JdbcMeta(CONNECTION_SPEC.url,
+            CONNECTION_SPEC.username, CONNECTION_SPEC.password);
         final LocalService localService = new LocalService(jdbcMeta);
-        final LocalJsonService localJsonService =
-            new LocalJsonService(localService);
-        return localJsonService;
+        service = new LocalJsonService(localService);
       } catch (SQLException e) {
         throw new RuntimeException(e);
       }
     }
 
+    @Override public Service create(AvaticaConnection connection) {
+      assert service != null;
+      return service;
+    }
+
     /**
      * Reach into the guts of a quasi-remote connection and pull out the
      * statement map from the other side.
+     * TODO: refactor tests to replace reflection with package-local access
      */
-    static Map<Integer, Statement> getRemoteMap(AvaticaConnection connection) throws Exception {
+    static Map<Integer, Statement>
+    getRemoteStatementMap(AvaticaConnection connection) throws Exception {
       Field metaF = AvaticaConnection.class.getDeclaredField("meta");
       metaF.setAccessible(true);
       Meta clientMeta = (Meta) metaF.get(connection);
@@ -332,6 +375,32 @@ public class RemoteDriverTest {
       jdbcMetaStatementMapF.setAccessible(true);
       return (Map<Integer, Statement>) jdbcMetaStatementMapF.get(serverMeta);
     }
+
+    /**
+     * Reach into the guts of a quasi-remote connection and pull out the
+     * connection map from the other side.
+     * TODO: refactor tests to replace reflection with package-local access
+     */
+    static Map<String, Connection>
+    getRemoteConnectionMap(AvaticaConnection connection) throws Exception {
+      Field metaF = AvaticaConnection.class.getDeclaredField("meta");
+      metaF.setAccessible(true);
+      Meta clientMeta = (Meta) metaF.get(connection);
+      Field remoteMetaServiceF = clientMeta.getClass().getDeclaredField("service");
+      remoteMetaServiceF.setAccessible(true);
+      LocalJsonService remoteMetaService = (LocalJsonService) remoteMetaServiceF.get(clientMeta);
+      Field remoteMetaServiceServiceF = remoteMetaService.getClass().getDeclaredField("service");
+      remoteMetaServiceServiceF.setAccessible(true);
+      LocalService remoteMetaServiceService =
+          (LocalService) remoteMetaServiceServiceF.get(remoteMetaService);
+      Field remoteMetaServiceServiceMetaF =
+          remoteMetaServiceService.getClass().getDeclaredField("meta");
+      remoteMetaServiceServiceMetaF.setAccessible(true);
+      JdbcMeta serverMeta = (JdbcMeta) remoteMetaServiceServiceMetaF.get(remoteMetaServiceService);
+      Field jdbcMetaStatementMapF = JdbcMeta.class.getDeclaredField("connectionMap");
+      jdbcMetaStatementMapF.setAccessible(true);
+      return (Map<String, Connection>) jdbcMetaStatementMapF.get(serverMeta);
+    }
   }
 
   /** Information necessary to create a JDBC connection. Specify one to run

http://git-wip-us.apache.org/repos/asf/incubator-calcite/blob/0ad6019b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
index ad2f9cf..849ef63 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
@@ -190,7 +190,8 @@ public class CalciteMetaImpl extends MetaImpl {
               return Linq4j.asEnumerable(firstFrame.rows);
             }
           };
-      return new MetaResultSet(statement.getId(), true, signature, firstFrame);
+      return new MetaResultSet(connection.id, statement.getId(), true,
+          signature, firstFrame);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
@@ -459,16 +460,20 @@ public class CalciteMetaImpl extends MetaImpl {
     }
   }
 
-  public Signature prepare(StatementHandle h, String sql, int maxRowCount) {
+  @Override public StatementHandle prepare(ConnectionHandle ch, String sql,
+      int maxRowCount) {
+    final StatementHandle h = createStatement(ch);
     final CalciteConnectionImpl calciteConnection = getConnection();
     CalciteServerStatement statement = calciteConnection.server.getStatement(h);
-    return calciteConnection.parseQuery(sql, statement.createPrepareContext(),
+    calciteConnection.parseQuery(sql, statement.createPrepareContext(),
         maxRowCount);
+    return h;
   }
 
-  public MetaResultSet prepareAndExecute(StatementHandle h, String sql,
-      int maxRowCount, PrepareCallback callback) {
+  @Override public MetaResultSet prepareAndExecute(ConnectionHandle ch,
+      String sql, int maxRowCount, PrepareCallback callback) {
     final CalcitePrepare.CalciteSignature<Object> signature;
+    final StatementHandle h = createStatement(ch);
     try {
       synchronized (callback.getMonitor()) {
         callback.clear();
@@ -480,7 +485,7 @@ public class CalciteMetaImpl extends MetaImpl {
         callback.assign(signature, null);
       }
       callback.execute();
-      return new MetaResultSet(h.id, false, signature, null);
+      return new MetaResultSet(h.connectionId, h.id, false, signature, null);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }


Mime
View raw message