Return-Path: X-Original-To: apmail-calcite-commits-archive@www.apache.org Delivered-To: apmail-calcite-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 08D00183CA for ; Wed, 11 Nov 2015 12:25:10 +0000 (UTC) Received: (qmail 41725 invoked by uid 500); 11 Nov 2015 12:25:09 -0000 Delivered-To: apmail-calcite-commits-archive@calcite.apache.org Received: (qmail 41618 invoked by uid 500); 11 Nov 2015 12:25:09 -0000 Mailing-List: contact commits-help@calcite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@calcite.apache.org Delivered-To: mailing list commits@calcite.apache.org Received: (qmail 40353 invoked by uid 99); 11 Nov 2015 12:25:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Nov 2015 12:25:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 808C5E0B20; Wed, 11 Nov 2015 12:25:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jcamacho@apache.org To: commits@calcite.apache.org Date: Wed, 11 Nov 2015 12:25:38 -0000 Message-Id: <69f5aa5e9be04a38bcb9f99a0bd57ced@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [32/50] [abbrv] calcite git commit: [CALCITE-903] Enable Avatica client to recover from missing server-side state (Josh Elser) [CALCITE-903] Enable Avatica client to recover from missing server-side state (Josh Elser) Close apache/calcite#140 Project: http://git-wip-us.apache.org/repos/asf/calcite/repo Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/97df1acb Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/97df1acb Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/97df1acb Branch: refs/heads/branch-release Commit: 97df1acbe82881f0fd196477dadbcbc173bddc20 Parents: 1a491b7 Author: Josh Elser Authored: Wed Sep 23 11:53:13 2015 -0400 Committer: Julian Hyde Committed: Wed Oct 28 16:42:33 2015 -0700 ---------------------------------------------------------------------- .../apache/calcite/avatica/jdbc/JdbcMeta.java | 109 +- .../calcite/avatica/jdbc/JdbcResultSet.java | 12 +- .../calcite/avatica/jdbc/StatementInfo.java | 170 ++ .../calcite/avatica/jdbc/StatementInfoTest.java | 138 + .../remote/AlternatingRemoteMetaTest.java | 392 +++ .../calcite/avatica/remote/RemoteMetaTest.java | 6 +- .../calcite/avatica/AvaticaConnection.java | 77 +- .../avatica/AvaticaDatabaseMetaData.java | 789 ++++- .../apache/calcite/avatica/AvaticaFactory.java | 3 +- .../calcite/avatica/AvaticaJdbc41Factory.java | 4 +- .../avatica/AvaticaPreparedStatement.java | 11 +- .../calcite/avatica/AvaticaResultSet.java | 5 +- .../calcite/avatica/AvaticaStatement.java | 40 +- .../java/org/apache/calcite/avatica/Meta.java | 17 +- .../org/apache/calcite/avatica/MetaImpl.java | 69 +- .../avatica/MissingResultsException.java | 41 + .../avatica/NoSuchConnectionException.java | 37 + .../avatica/NoSuchStatementException.java | 39 + .../org/apache/calcite/avatica/QueryState.java | 489 +++ .../calcite/avatica/UnregisteredDriver.java | 2 +- .../apache/calcite/avatica/proto/Common.java | 2847 +++++++++++++++++- .../apache/calcite/avatica/proto/Requests.java | 770 ++++- .../apache/calcite/avatica/proto/Responses.java | 769 ++++- .../calcite/avatica/remote/AbstractHandler.java | 5 + .../calcite/avatica/remote/AbstractService.java | 5 +- .../avatica/remote/AvaticaHttpClient.java | 34 + .../avatica/remote/AvaticaHttpClientImpl.java | 73 + .../apache/calcite/avatica/remote/Driver.java | 65 +- .../apache/calcite/avatica/remote/Handler.java | 4 + .../calcite/avatica/remote/JsonService.java | 8 + .../calcite/avatica/remote/LocalService.java | 99 +- .../avatica/remote/MetaDataOperation.java | 181 ++ .../calcite/avatica/remote/MockJsonService.java | 2 +- .../avatica/remote/MockProtobufService.java | 2 +- .../calcite/avatica/remote/ProtobufService.java | 4 + .../avatica/remote/ProtobufTranslationImpl.java | 6 + .../calcite/avatica/remote/RemoteMeta.java | 368 ++- .../avatica/remote/RemoteProtobufService.java | 42 +- .../calcite/avatica/remote/RemoteService.java | 45 +- .../apache/calcite/avatica/remote/Service.java | 227 +- avatica/src/main/protobuf/common.proto | 64 + avatica/src/main/protobuf/requests.proto | 7 + avatica/src/main/protobuf/responses.proto | 8 + .../calcite/avatica/AvaticaConnectionTest.java | 60 + .../apache/calcite/avatica/QueryStateTest.java | 513 ++++ .../avatica/remote/AvaticaHttpClientTest.java | 93 + .../avatica/remote/MetaDataOperationTest.java | 37 + .../avatica/remote/ProtobufHandlerTest.java | 2 +- .../remote/ProtobufTranslationImplTest.java | 37 +- .../calcite/avatica/test/JsonHandlerTest.java | 6 +- .../calcite/jdbc/CalciteJdbc41Factory.java | 3 +- .../apache/calcite/jdbc/CalciteMetaImpl.java | 20 +- .../apache/calcite/jdbc/CalciteResultSet.java | 2 +- src/main/config/checkstyle/checker.xml | 4 - 54 files changed, 8302 insertions(+), 560 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java index b8e4ea4..1bfd7f6 100644 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java +++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java @@ -23,6 +23,10 @@ import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.avatica.ConnectionPropertiesImpl; import org.apache.calcite.avatica.Meta; import org.apache.calcite.avatica.MetaImpl; +import org.apache.calcite.avatica.MissingResultsException; +import org.apache.calcite.avatica.NoSuchConnectionException; +import org.apache.calcite.avatica.NoSuchStatementException; +import org.apache.calcite.avatica.QueryState; import org.apache.calcite.avatica.SqlType; import org.apache.calcite.avatica.remote.TypedValue; @@ -51,7 +55,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -64,7 +67,7 @@ public class JdbcMeta implements Meta { private static final String STMT_CACHE_KEY_BASE = "avatica.statementcache"; - /** Special value for {@link Statement#getLargeMaxRows()} that means fetch + /** Special value for {@code Statement#getLargeMaxRows()} that means fetch * an unlimited number of rows in a single batch. * *

Any other negative value will return an unlimited number of rows but @@ -290,7 +293,7 @@ public class JdbcMeta implements Meta { private int registerMetaStatement(ResultSet rs) throws SQLException { final int id = statementIdGenerator.getAndIncrement(); StatementInfo statementInfo = new StatementInfo(rs.getStatement()); - statementInfo.resultSet = rs; + statementInfo.setResultSet(rs); statementCache.put(id, statementInfo); return id; } @@ -508,7 +511,7 @@ public class JdbcMeta implements Meta { return null; } - public Iterable createIterable(StatementHandle handle, + public Iterable createIterable(StatementHandle handle, QueryState state, Signature signature, List parameterValues, Frame firstFrame) { return null; } @@ -519,7 +522,8 @@ public class JdbcMeta implements Meta { } Connection conn = connectionCache.getIfPresent(id); if (conn == null) { - throw new RuntimeException("Connection not found: invalid id, closed, or expired: " + id); + throw new NoSuchConnectionException("Connection not found: invalid id, closed, or expired: " + + id); } return conn; } @@ -550,8 +554,9 @@ public class JdbcMeta implements Meta { LOG.trace("closing statement " + h); } try { - if (info.resultSet != null) { - info.resultSet.close(); + ResultSet results = info.getResultSet(); + if (info.isResultSetInitialized() && null != results) { + results.close(); } info.statement.close(); } catch (SQLException e) { @@ -674,12 +679,11 @@ public class JdbcMeta implements Meta { } public ExecuteResult prepareAndExecute(StatementHandle h, String sql, - long maxRowCount, PrepareCallback callback) { + long maxRowCount, PrepareCallback callback) throws NoSuchStatementException { try { final StatementInfo info = statementCache.getIfPresent(h.id); if (info == null) { - throw new RuntimeException("Statement not found, potentially expired. " - + h); + throw new NoSuchStatementException(h); } final Statement statement = info.statement; // Special handling of maxRowCount as JDBC 0 is unlimited, our meta 0 row @@ -689,18 +693,18 @@ public class JdbcMeta implements Meta { statement.setMaxRows(0); } boolean ret = statement.execute(sql); - info.resultSet = statement.getResultSet(); - assert ret || info.resultSet == null; + info.setResultSet(statement.getResultSet()); + // Either execute(sql) returned true or the resultSet was null + assert ret || null == info.getResultSet(); final List resultSets = new ArrayList<>(); - if (info.resultSet == null) { + if (null == info.getResultSet()) { // Create a special result set that just carries update count resultSets.add( JdbcResultSet.count(h.connectionId, h.id, AvaticaUtils.getLargeUpdateCount(statement))); } else { resultSets.add( - JdbcResultSet.create(h.connectionId, h.id, info.resultSet, - maxRowCount)); + JdbcResultSet.create(h.connectionId, h.id, info.getResultSet(), maxRowCount)); } if (LOG.isTraceEnabled()) { LOG.trace("prepAndExec statement " + h); @@ -712,19 +716,51 @@ public class JdbcMeta implements Meta { } } - public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) { + public boolean syncResults(StatementHandle sh, QueryState state, long offset) + throws NoSuchStatementException { + try { + final Connection conn = getConnection(sh.connectionId); + final StatementInfo info = statementCache.getIfPresent(sh.id); + if (null == info) { + throw new NoSuchStatementException(sh); + } + final Statement statement = info.statement; + // Let the state recreate the necessary ResultSet on the Statement + info.setResultSet(state.invoke(conn, statement)); + + if (null != info.getResultSet()) { + // If it is non-null, try to advance to the requested offset. + return info.advanceResultSetToOffset(info.getResultSet(), offset); + } + + // No results, nothing to do. Client can move on. + return false; + } catch (SQLException e) { + throw propagate(e); + } + } + + public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) throws + NoSuchStatementException, MissingResultsException { if (LOG.isTraceEnabled()) { LOG.trace("fetching " + h + " offset:" + offset + " fetchMaxRowCount:" + fetchMaxRowCount); } try { - final StatementInfo statementInfo = Objects.requireNonNull( - statementCache.getIfPresent(h.id), - "Statement not found, potentially expired. " + h); - if (statementInfo.resultSet == null) { + final StatementInfo statementInfo = statementCache.getIfPresent(h.id); + if (null == statementInfo) { + // Statement might have expired, or never existed on this server. + throw new NoSuchStatementException(h); + } + + if (!statementInfo.isResultSetInitialized()) { + // The Statement exists, but the results are missing. Need to call syncResults(...) + throw new MissingResultsException(h); + } + if (statementInfo.getResultSet() == null) { return Frame.EMPTY; } else { - return JdbcResultSet.frame(statementInfo.resultSet, offset, + return JdbcResultSet.frame(statementInfo, statementInfo.getResultSet(), offset, fetchMaxRowCount, calendar); } } catch (SQLException e) { @@ -740,15 +776,16 @@ public class JdbcMeta implements Meta { } @Override public ExecuteResult execute(StatementHandle h, - List parameterValues, long maxRowCount) { + List parameterValues, long maxRowCount) throws NoSuchStatementException { try { if (MetaImpl.checkParameterValueHasNull(parameterValues)) { throw new SQLException("exception while executing query: unbound parameter"); } - final StatementInfo statementInfo = Objects.requireNonNull( - statementCache.getIfPresent(h.id), - "Statement not found, potentially expired. " + h); + final StatementInfo statementInfo = statementCache.getIfPresent(h.id); + if (null == statementInfo) { + throw new NoSuchStatementException(h); + } final List resultSets = new ArrayList<>(); final PreparedStatement preparedStatement = (PreparedStatement) statementInfo.statement; @@ -772,14 +809,16 @@ public class JdbcMeta implements Meta { signature2 = h.signature; } - statementInfo.resultSet = preparedStatement.getResultSet(); - if (statementInfo.resultSet == null) { + // Make sure we set this for subsequent fetch()'s to find the result set. + statementInfo.setResultSet(preparedStatement.getResultSet()); + + if (statementInfo.getResultSet() == null) { frame = Frame.EMPTY; resultSets.add(JdbcResultSet.empty(h.connectionId, h.id, signature2)); } else { resultSets.add( JdbcResultSet.create(h.connectionId, h.id, - statementInfo.resultSet, maxRowCount, signature2)); + statementInfo.getResultSet(), maxRowCount, signature2)); } } else { resultSets.add( @@ -793,16 +832,6 @@ public class JdbcMeta implements Meta { } } - /** All we know about a statement. */ - private static class StatementInfo { - final Statement statement; // sometimes a PreparedStatement - ResultSet resultSet; - - private StatementInfo(Statement statement) { - this.statement = Objects.requireNonNull(statement); - } - } - /** Configurable statement cache settings. */ public enum StatementCacheSettings { /** JDBC connection property for setting connection cache concurrency level. */ @@ -917,8 +946,8 @@ public class JdbcMeta implements Meta { + notification.getCause()); } try { - if (doomed.resultSet != null) { - doomed.resultSet.close(); + if (doomed.getResultSet() != null) { + doomed.getResultSet().close(); } if (doomed.statement != null) { doomed.statement.close(); http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java index 30ee7f4..6630124 100644 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java +++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java @@ -88,7 +88,7 @@ class JdbcResultSet extends Meta.MetaResultSet { } else { fetchRowCount = (int) maxRowCount; } - final Meta.Frame firstFrame = frame(resultSet, 0, fetchRowCount, calendar); + final Meta.Frame firstFrame = frame(null, resultSet, 0, fetchRowCount, calendar); if (firstFrame.done) { resultSet.close(); } @@ -114,7 +114,7 @@ class JdbcResultSet extends Meta.MetaResultSet { /** Creates a frame containing a given number or unlimited number of rows * from a result set. */ - static Meta.Frame frame(ResultSet resultSet, long offset, + static Meta.Frame frame(StatementInfo info, ResultSet resultSet, long offset, int fetchMaxRowCount, Calendar calendar) throws SQLException { final ResultSetMetaData metaData = resultSet.getMetaData(); final int columnCount = metaData.getColumnCount(); @@ -126,7 +126,13 @@ class JdbcResultSet extends Meta.MetaResultSet { // Meta prepare/prepareAndExecute 0 return 0 row and done boolean done = fetchMaxRowCount == 0; for (int i = 0; fetchMaxRowCount < 0 || i < fetchMaxRowCount; i++) { - if (!resultSet.next()) { + final boolean hasRow; + if (null != info) { + hasRow = info.next(); + } else { + hasRow = resultSet.next(); + } + if (!hasRow) { done = true; resultSet.close(); break; http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java new file mode 100644 index 0000000..ff27d05 --- /dev/null +++ b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.jdbc; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.Statement; +import java.util.Objects; + +/** + * All we know about a statement. Encapsulates a {@link ResultSet}. + */ +public class StatementInfo { + private volatile Boolean relativeSupported = null; + + final Statement statement; // sometimes a PreparedStatement + private ResultSet resultSet; + private long position = 0; + + // True when setResultSet(ResultSet) is called to let us determine the difference between + // a null ResultSet (from an update) from the lack of a ResultSet. + private boolean resultsInitialized = false; + + public StatementInfo(Statement statement) { + this.statement = Objects.requireNonNull(statement); + } + + // Visible for testing + void setPosition(long position) { + this.position = position; + } + + // Visible for testing + long getPosition() { + return this.position; + } + + /** + * Set a ResultSet on this object. + * + * @param resultSet The current ResultSet + */ + public void setResultSet(ResultSet resultSet) { + resultsInitialized = true; + this.resultSet = resultSet; + } + + /** + * @return The {@link ResultSet} for this Statement, may be null. + */ + public ResultSet getResultSet() { + return this.resultSet; + } + + /** + * @return True if {@link #setResultSet(ResultSet)} was ever invoked. + */ + public boolean isResultSetInitialized() { + return resultsInitialized; + } + + /** + * @see ResultSet#next() + */ + public boolean next() throws SQLException { + return _next(resultSet); + } + + boolean _next(ResultSet results) throws SQLException { + boolean ret = results.next(); + position++; + return ret; + } + + /** + * Consumes offset - position elements from the {@link ResultSet}. + * + * @param offset The offset to advance to + * @return True if the resultSet was advanced to the current point, false if insufficient rows + * were present to advance to the requested offset. + */ + public boolean advanceResultSetToOffset(ResultSet results, long offset) throws SQLException { + if (offset < 0 || offset < position) { + throw new IllegalArgumentException("Offset should be " + + " non-negative and not less than the current position. " + offset + ", " + position); + } + if (position >= offset) { + return true; + } + + if (null == relativeSupported) { + Boolean moreResults = null; + synchronized (this) { + if (null == relativeSupported) { + try { + moreResults = advanceByRelative(results, offset); + relativeSupported = true; + } catch (SQLFeatureNotSupportedException e) { + relativeSupported = false; + } + } + } + + if (null != moreResults) { + // We figured out whether or not relative is supported. + // Make sure we actually do the necessary work. + if (!relativeSupported) { + // We avoided calling advanceByNext in the synchronized block earlier. + moreResults = advanceByNext(results, offset); + } + + return moreResults; + } + + // Another thread updated the RELATIVE_SUPPORTED before we did, fall through. + } + + if (relativeSupported) { + return advanceByRelative(results, offset); + } else { + return advanceByNext(results, offset); + } + } + + private boolean advanceByRelative(ResultSet results, long offset) throws SQLException { + long diff = offset - position; + while (diff > Integer.MAX_VALUE) { + if (!results.relative(Integer.MAX_VALUE)) { + // Avoid updating position until relative succeeds. + position += Integer.MAX_VALUE; + return false; + } + // Avoid updating position until relative succeeds. + position += Integer.MAX_VALUE; + diff -= Integer.MAX_VALUE; + } + boolean ret = results.relative((int) diff); + // Make sure we only update the position after successfully calling relative(int). + position += diff; + return ret; + } + + private boolean advanceByNext(ResultSet results, long offset) throws SQLException { + while (position < offset) { + // Advance while maintaining `position` + if (!_next(results)) { + return false; + } + } + + return true; + } +} + +// End StatementInfo.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java new file mode 100644 index 0000000..2984692 --- /dev/null +++ b/avatica-server/src/test/java/org/apache/calcite/avatica/jdbc/StatementInfoTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.jdbc; + +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.sql.ResultSet; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.Statement; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +/** + * Tests covering {@link StatementInfo}. + */ +public class StatementInfoTest { + + @Test + public void testLargeOffsets() throws Exception { + Statement stmt = Mockito.mock(Statement.class); + ResultSet results = Mockito.mock(ResultSet.class); + + StatementInfo info = new StatementInfo(stmt); + + Mockito.when(results.relative(Integer.MAX_VALUE)).thenReturn(true, true); + Mockito.when(results.relative(1)).thenReturn(true); + + long offset = 1L + Integer.MAX_VALUE + Integer.MAX_VALUE; + assertTrue(info.advanceResultSetToOffset(results, offset)); + + InOrder inOrder = Mockito.inOrder(results); + + inOrder.verify(results, Mockito.times(2)).relative(Integer.MAX_VALUE); + inOrder.verify(results).relative(1); + + assertEquals(offset, info.getPosition()); + } + + @Test + public void testNextUpdatesPosition() throws Exception { + Statement stmt = Mockito.mock(Statement.class); + ResultSet results = Mockito.mock(ResultSet.class); + + StatementInfo info = new StatementInfo(stmt); + info.setResultSet(results); + + Mockito.when(results.next()).thenReturn(true, true, true, false); + + for (int i = 0; i < 3; i++) { + assertTrue(i + "th call of next() should return true", info.next()); + assertEquals(info.getPosition(), i + 1); + } + + assertFalse("Expected last next() to return false", info.next()); + assertEquals(info.getPosition(), 4L); + } + + @Test(expected = IllegalArgumentException.class) + public void testNoMovement() throws Exception { + Statement stmt = Mockito.mock(Statement.class); + ResultSet results = Mockito.mock(ResultSet.class); + + StatementInfo info = new StatementInfo(stmt); + info.setPosition(500); + + info.advanceResultSetToOffset(results, 400); + } + + @Test public void testResultSetGetter() throws Exception { + Statement stmt = Mockito.mock(Statement.class); + ResultSet results = Mockito.mock(ResultSet.class); + + StatementInfo info = new StatementInfo(stmt); + + assertFalse("ResultSet should not be initialized", info.isResultSetInitialized()); + assertNull("ResultSet should be null", info.getResultSet()); + + info.setResultSet(results); + + assertTrue("ResultSet should be initialized", info.isResultSetInitialized()); + assertEquals(results, info.getResultSet()); + } + + @Test public void testCheckPositionAfterFailedRelative() throws Exception { + Statement stmt = Mockito.mock(Statement.class); + ResultSet results = Mockito.mock(ResultSet.class); + final long offset = 500; + + StatementInfo info = new StatementInfo(stmt); + info.setResultSet(results); + + // relative() doesn't work + Mockito.when(results.relative((int) offset)).thenThrow(new SQLFeatureNotSupportedException()); + // Should fall back to next(), 500 calls to next, 1 false + Mockito.when(results.next()).then(new Answer() { + private long invocations = 0; + + // Return true until 500, false after. + @Override public Boolean answer(InvocationOnMock invocation) throws Throwable { + invocations++; + if (invocations >= offset) { + return false; + } + return true; + } + }); + + info.advanceResultSetToOffset(results, offset); + + // Verify correct position + assertEquals(offset, info.getPosition()); + // Make sure that we actually advanced the result set + Mockito.verify(results, Mockito.times(500)).next(); + } +} + +// End StatementInfoTest.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java new file mode 100644 index 0000000..df2862d --- /dev/null +++ b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/AlternatingRemoteMetaTest.java @@ -0,0 +1,392 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.remote; + +import org.apache.calcite.avatica.AvaticaConnection; +import org.apache.calcite.avatica.AvaticaStatement; +import org.apache.calcite.avatica.ConnectionConfig; +import org.apache.calcite.avatica.ConnectionPropertiesImpl; +import org.apache.calcite.avatica.ConnectionSpec; +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.jdbc.JdbcMeta; +import org.apache.calcite.avatica.server.AvaticaHandler; +import org.apache.calcite.avatica.server.HttpServer; +import org.apache.calcite.avatica.server.Main; +import org.apache.calcite.avatica.server.Main.HandlerFactory; + +import com.google.common.cache.Cache; + +import org.eclipse.jetty.server.handler.AbstractHandler; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.net.MalformedURLException; +import java.net.URL; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests that verify that the Driver still functions when requests are randomly bounced between + * more than one server. + */ +public class AlternatingRemoteMetaTest { + private static final ConnectionSpec CONNECTION_SPEC = ConnectionSpec.HSQLDB; + + private static String url; + + static { + try { + DriverManager.registerDriver(new AlternatingDriver()); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + + // Keep a reference to the servers we start to clean them up after + private static final List ACTIVE_SERVERS = new ArrayList<>(); + + /** Factory that provides a {@link JdbcMeta}. */ + public static class FullyRemoteJdbcMetaFactory implements Meta.Factory { + + private static JdbcMeta instance = null; + + private static JdbcMeta getInstance() { + if (instance == null) { + try { + instance = new JdbcMeta(CONNECTION_SPEC.url, CONNECTION_SPEC.username, + CONNECTION_SPEC.password); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + return instance; + } + + @Override public Meta create(List args) { + return getInstance(); + } + } + + /** + * AvaticaHttpClient implementation that randomly chooses among the provided URLs. + */ + public static class AlternatingAvaticaHttpClient implements AvaticaHttpClient { + private final List clients; + private final Random r = new Random(); + + public AlternatingAvaticaHttpClient(List urls) { + //System.out.println("Constructing clients for " + urls); + clients = new ArrayList<>(urls.size()); + for (URL url : urls) { + clients.add(new AvaticaHttpClientImpl(url)); + } + } + + public byte[] send(byte[] request) { + AvaticaHttpClientImpl client = clients.get(r.nextInt(clients.size())); + //System.out.println("URL: " + client.url); + return client.send(request); + } + + } + + /** + * Driver implementation {@link AlternatingAvaticaHttpClient}. + */ + public static class AlternatingDriver extends Driver { + + public static final String PREFIX = "jdbc:avatica:remote-alternating:"; + + @Override protected String getConnectStringPrefix() { + return PREFIX; + } + + @Override public Meta createMeta(AvaticaConnection connection) { + final ConnectionConfig config = connection.config(); + return new RemoteMeta(connection, new RemoteService(getHttpClient(connection, config))); + } + + @Override AvaticaHttpClient getHttpClient(AvaticaConnection connection, + ConnectionConfig config) { + return new AlternatingAvaticaHttpClient(parseUrls(config.url())); + } + + List parseUrls(String urlStr) { + final List urls = new ArrayList<>(); + final char comma = ','; + + int prevIndex = 0; + int index = urlStr.indexOf(comma); + if (-1 == index) { + try { + return Collections.singletonList(new URL(urlStr)); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + // String split w/o regex + while (-1 != index) { + try { + urls.add(new URL(urlStr.substring(prevIndex, index))); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + prevIndex = index + 1; + index = urlStr.indexOf(comma, prevIndex); + } + + // Get the last one + try { + urls.add(new URL(urlStr.substring(prevIndex))); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + + return urls; + } + + } + + @BeforeClass + public static void beforeClass() throws Exception { + final String[] mainArgs = new String[] { FullyRemoteJdbcMetaFactory.class.getName() }; + + // Bind to '0' to pluck an ephemeral port instead of expecting a certain one to be free + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 2; i++) { + if (sb.length() > 0) { + sb.append(","); + } + HttpServer jsonServer = Main.start(mainArgs, 0, new HandlerFactory() { + @Override public AbstractHandler createHandler(Service service) { + return new AvaticaHandler(service); + } + }); + ACTIVE_SERVERS.add(jsonServer); + sb.append("http://localhost:").append(jsonServer.getPort()); + } + + url = AlternatingDriver.PREFIX + "url=" + sb.toString(); + } + + @AfterClass public static void afterClass() throws Exception { + for (HttpServer server : ACTIVE_SERVERS) { + if (server != null) { + server.stop(); + } + } + } + + private static Meta getMeta(AvaticaConnection conn) throws Exception { + Field f = AvaticaConnection.class.getDeclaredField("meta"); + f.setAccessible(true); + return (Meta) f.get(conn); + } + + private static Meta.ExecuteResult prepareAndExecuteInternal(AvaticaConnection conn, + final AvaticaStatement statement, String sql, int maxRowCount) throws Exception { + Method m = + AvaticaConnection.class.getDeclaredMethod("prepareAndExecuteInternal", + AvaticaStatement.class, String.class, long.class); + m.setAccessible(true); + return (Meta.ExecuteResult) m.invoke(conn, statement, sql, maxRowCount); + } + + private static Connection getConnection(JdbcMeta m, String id) throws Exception { + Field f = JdbcMeta.class.getDeclaredField("connectionCache"); + f.setAccessible(true); + //noinspection unchecked + Cache connectionCache = (Cache) f.get(m); + return connectionCache.getIfPresent(id); + } + + @Test public void testRemoteExecuteMaxRowCount() throws Exception { + ConnectionSpec.getDatabaseLock().lock(); + try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url)) { + final AvaticaStatement statement = conn.createStatement(); + prepareAndExecuteInternal(conn, statement, + "select * from (values ('a', 1), ('b', 2))", 0); + ResultSet rs = statement.getResultSet(); + int count = 0; + while (rs.next()) { + count++; + } + assertEquals("Check maxRowCount=0 and ResultSets is 0 row", count, 0); + assertEquals("Check result set meta is still there", + rs.getMetaData().getColumnCount(), 2); + rs.close(); + statement.close(); + conn.close(); + } finally { + ConnectionSpec.getDatabaseLock().unlock(); + } + } + + /** Test case for + * [CALCITE-780] + * HTTP error 413 when sending a long string to the Avatica server. */ + @Test public void testRemoteExecuteVeryLargeQuery() throws Exception { + ConnectionSpec.getDatabaseLock().lock(); + try { + // Before the bug was fixed, a value over 7998 caused an HTTP 413. + // 16K bytes, I guess. + checkLargeQuery(8); + checkLargeQuery(240); + checkLargeQuery(8000); + checkLargeQuery(240000); + } finally { + ConnectionSpec.getDatabaseLock().unlock(); + } + } + + private void checkLargeQuery(int n) throws Exception { + try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url)) { + final AvaticaStatement statement = conn.createStatement(); + final String frenchDisko = "It said human existence is pointless\n" + + "As acts of rebellious solidarity\n" + + "Can bring sense in this world\n" + + "La resistance!\n"; + final String sql = "select '" + + longString(frenchDisko, n) + + "' as s from (values 'x')"; + prepareAndExecuteInternal(conn, statement, sql, -1); + ResultSet rs = statement.getResultSet(); + int count = 0; + while (rs.next()) { + count++; + } + assertThat(count, is(1)); + rs.close(); + statement.close(); + conn.close(); + } + } + + /** Creates a string of exactly {@code length} characters by concatenating + * {@code fragment}. */ + private static String longString(String fragment, int length) { + assert fragment.length() > 0; + final StringBuilder buf = new StringBuilder(); + while (buf.length() < length) { + buf.append(fragment); + } + buf.setLength(length); + return buf.toString(); + } + + @Test public void testRemoteConnectionProperties() throws Exception { + ConnectionSpec.getDatabaseLock().lock(); + try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url)) { + String id = conn.id; + final Map m = ((RemoteMeta) getMeta(conn)).propsMap; + assertFalse("remote connection map should start ignorant", m.containsKey(id)); + // force creating a connection object on the remote side. + try (final Statement stmt = conn.createStatement()) { + assertTrue("creating a statement starts a local object.", m.containsKey(id)); + assertTrue(stmt.execute("select count(1) from EMP")); + } + Connection remoteConn = getConnection(FullyRemoteJdbcMetaFactory.getInstance(), id); + final boolean defaultRO = remoteConn.isReadOnly(); + final boolean defaultAutoCommit = remoteConn.getAutoCommit(); + final String defaultCatalog = remoteConn.getCatalog(); + final String defaultSchema = remoteConn.getSchema(); + conn.setReadOnly(!defaultRO); + assertTrue("local changes dirty local state", m.get(id).isDirty()); + assertEquals("remote connection has not been touched", defaultRO, remoteConn.isReadOnly()); + conn.setAutoCommit(!defaultAutoCommit); + assertEquals("remote connection has not been touched", + defaultAutoCommit, remoteConn.getAutoCommit()); + + // further interaction with the connection will force a sync + try (final Statement stmt = conn.createStatement()) { + assertEquals(!defaultAutoCommit, remoteConn.getAutoCommit()); + assertFalse("local values should be clean", m.get(id).isDirty()); + } + } finally { + ConnectionSpec.getDatabaseLock().unlock(); + } + } + + @Test public void testQuery() throws Exception { + ConnectionSpec.getDatabaseLock().lock(); + try (AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url); + Statement statement = conn.createStatement()) { + assertFalse(statement.execute("SET SCHEMA \"SCOTT\"")); + assertFalse( + statement.execute( + "CREATE TABLE \"FOO\"(\"KEY\" INTEGER NOT NULL, \"VALUE\" VARCHAR(10))")); + assertFalse(statement.execute("SET TABLE \"FOO\" READONLY FALSE")); + + final int numRecords = 1000; + for (int i = 0; i < numRecords; i++) { + assertFalse(statement.execute("INSERT INTO \"FOO\" VALUES(" + i + ", '" + i + "')")); + } + + // Make sure all the records are there that we expect + ResultSet results = statement.executeQuery("SELECT count(KEY) FROM FOO"); + assertTrue(results.next()); + assertEquals(1000, results.getInt(1)); + assertFalse(results.next()); + + results = statement.executeQuery("SELECT KEY, VALUE FROM FOO ORDER BY KEY ASC"); + for (int i = 0; i < numRecords; i++) { + assertTrue(results.next()); + assertEquals(i, results.getInt(1)); + assertEquals(Integer.toString(i), results.getString(2)); + } + } finally { + ConnectionSpec.getDatabaseLock().unlock(); + } + } + + @Test public void testSingleUrlParsing() throws Exception { + AlternatingDriver d = new AlternatingDriver(); + List urls = d.parseUrls("http://localhost:1234"); + assertEquals(Arrays.asList(new URL("http://localhost:1234")), urls); + } + + @Test public void testMultipleUrlParsing() throws Exception { + AlternatingDriver d = new AlternatingDriver(); + List urls = d.parseUrls("http://localhost:1234,http://localhost:2345," + + "http://localhost:3456"); + List expectedUrls = Arrays.asList(new URL("http://localhost:1234"), + new URL("http://localhost:2345"), new URL("http://localhost:3456")); + assertEquals(expectedUrls, urls); + } +} + +// End AlternatingRemoteMetaTest.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java index e4d8690..c68f3cf 100644 --- a/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java +++ b/avatica-server/src/test/java/org/apache/calcite/avatica/remote/RemoteMetaTest.java @@ -35,6 +35,7 @@ import com.google.common.cache.Cache; import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.AfterClass; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -344,6 +345,7 @@ public class RemoteMetaTest { } } + @Ignore("[CALCITE-942] AvaticaConnection should fail-fast when closed.") @Test public void testRemoteConnectionClosing() throws Exception { AvaticaConnection conn = (AvaticaConnection) DriverManager.getConnection(url); // Verify connection is usable @@ -354,9 +356,9 @@ public class RemoteMetaTest { try { conn.createStatement(); fail("expected exception"); - } catch (RuntimeException e) { + } catch (SQLException e) { assertThat(e.getMessage(), - containsString("Connection not found: invalid id, closed, or expired")); + containsString("Connection is closed")); } } http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java index a8867c8..03f2aa1 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaConnection.java @@ -16,7 +16,10 @@ */ package org.apache.calcite.avatica; +import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException; import org.apache.calcite.avatica.Meta.MetaResultSet; +import org.apache.calcite.avatica.remote.Service.ErrorResponse; +import org.apache.calcite.avatica.remote.Service.OpenConnectionRequest; import org.apache.calcite.avatica.remote.TypedValue; import java.sql.Array; @@ -57,6 +60,9 @@ public abstract class AvaticaConnection implements Connection { * the number of rows modified. */ public static final String ROWCOUNT_COLUMN_NAME = "ROWCOUNT"; + public static final String NUM_EXECUTE_RETRIES_KEY = "avatica.statement.retries"; + public static final String NUM_EXECUTE_RETRIES_DEFAULT = "5"; + /** The name of the sole column returned by an EXPLAIN statement. * *

Actually Avatica does not care what this column is called, but here is @@ -80,6 +86,7 @@ public abstract class AvaticaConnection implements Connection { public final Map properties = new HashMap<>(); public final Map statementMap = new ConcurrentHashMap<>(); + protected final long maxRetriesPerExecute; /** * Creates an AvaticaConnection. @@ -105,6 +112,14 @@ public abstract class AvaticaConnection implements Connection { this.meta = driver.createMeta(this); this.metaData = factory.newDatabaseMetaData(this); this.holdability = metaData.getResultSetHoldability(); + this.maxRetriesPerExecute = getNumStatementRetries(info); + } + + /** Computes the number of retries + * {@link #executeInternal(String)} should retry before failing. */ + long getNumStatementRetries(Properties props) { + return Long.valueOf(Objects.requireNonNull(props) + .getProperty(NUM_EXECUTE_RETRIES_KEY, NUM_EXECUTE_RETRIES_DEFAULT)); } /** Returns a view onto this connection's configuration properties. Code @@ -116,6 +131,14 @@ public abstract class AvaticaConnection implements Connection { return new ConnectionConfigImpl(info); } + /** + * Opens the connection on the server. + */ + public void openConnection() { + // Open the connection on the server + this.meta.openConnection(handle, OpenConnectionRequest.serializeProperties(info)); + } + // Connection methods public AvaticaStatement createStatement() throws SQLException { @@ -412,11 +435,12 @@ public abstract class AvaticaConnection implements Connection { * @param statement Statement * @param signature Prepared query * @param firstFrame First frame of rows, or null if we need to execute + * @param state The state used to create the given result * @return Result set * @throws java.sql.SQLException if a database error occurs */ protected ResultSet executeQueryInternal(AvaticaStatement statement, - Meta.Signature signature, Meta.Frame firstFrame) throws SQLException { + Meta.Signature signature, Meta.Frame firstFrame, QueryState state) throws SQLException { // Close the previous open result set, if there is one. Meta.Frame frame = firstFrame; Meta.Signature signature2 = signature; @@ -453,8 +477,9 @@ public abstract class AvaticaConnection implements Connection { if (frame == null && signature2 == null && statement.updateCount != -1) { statement.openResultSet = null; } else { + // Duplicative SQL, for support non-prepared statements statement.openResultSet = - factory.newResultSet(statement, signature2, timeZone, frame); + factory.newResultSet(statement, state, signature2, timeZone, frame); } } // Release the monitor before executing, to give another thread the @@ -502,8 +527,8 @@ public abstract class AvaticaConnection implements Connection { } protected Meta.ExecuteResult prepareAndExecuteInternal( - final AvaticaStatement statement, String sql, long maxRowCount) - throws SQLException { + final AvaticaStatement statement, final String sql, long maxRowCount) + throws SQLException, NoSuchStatementException { final Meta.PrepareCallback callback = new Meta.PrepareCallback() { public Object getMonitor() { @@ -531,7 +556,7 @@ public abstract class AvaticaConnection implements Connection { statement.updateCount = updateCount; } else { final TimeZone timeZone = getTimeZone(); - statement.openResultSet = factory.newResultSet(statement, + statement.openResultSet = factory.newResultSet(statement, new QueryState(sql), signature, timeZone, firstFrame); } } @@ -546,13 +571,13 @@ public abstract class AvaticaConnection implements Connection { return meta.prepareAndExecute(statement.handle, sql, maxRowCount, callback); } - protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet) + protected ResultSet createResultSet(Meta.MetaResultSet metaResultSet, QueryState state) throws SQLException { final Meta.StatementHandle h = new Meta.StatementHandle( metaResultSet.connectionId, metaResultSet.statementId, null); final AvaticaStatement statement = lookupStatement(h); ResultSet resultSet = executeQueryInternal(statement, metaResultSet.signature.sanitize(), - metaResultSet.firstFrame); + metaResultSet.firstFrame, state); if (metaResultSet.ownStatement) { resultSet.getStatement().closeOnCompletion(); } @@ -617,6 +642,44 @@ public abstract class AvaticaConnection implements Connection { return connection.meta; } } + + /** + * A Callable-like interface but without a "throws Exception". + * + * @param The return type from {@code call}. + */ + public interface CallableWithoutException { + T call(); + } + + /** + * Invokes the given "callable", retrying the call when the server responds with an error + * denoting that the connection is missing on the server. + * + * @param callable The function to invoke. + * @return The value from the result of the callable. + */ + public T invokeWithRetries(CallableWithoutException callable) { + RuntimeException lastException = null; + for (int i = 0; i < maxRetriesPerExecute; i++) { + try { + return callable.call(); + } catch (AvaticaClientRuntimeException e) { + lastException = e; + if (ErrorResponse.MISSING_CONNECTION_ERROR_CODE == e.getErrorCode()) { + this.openConnection(); + continue; + } + throw e; + } + } + if (null != lastException) { + throw lastException; + } else { + // Shouldn't ever happen. + throw new IllegalStateException(); + } + } } // End AvaticaConnection.java http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java index 27a2687..b57f36c 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaDatabaseMetaData.java @@ -16,6 +16,8 @@ */ package org.apache.calcite.avatica; +import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException; +import org.apache.calcite.avatica.remote.MetaDataOperation; import org.apache.calcite.avatica.util.Casing; import org.apache.calcite.avatica.util.Quoting; @@ -182,28 +184,53 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData { } public String getSQLKeywords() throws SQLException { - return Meta.DatabaseProperty.GET_S_Q_L_KEYWORDS - .getProp(connection.meta, connection.handle, String.class); + return connection.invokeWithRetries( + new CallableWithoutException() { + public String call() { + return Meta.DatabaseProperty.GET_S_Q_L_KEYWORDS + .getProp(connection.meta, connection.handle, String.class); + } + }); } public String getNumericFunctions() throws SQLException { - return Meta.DatabaseProperty.GET_NUMERIC_FUNCTIONS - .getProp(connection.meta, connection.handle, String.class); + return connection.invokeWithRetries( + new CallableWithoutException() { + public String call() { + return Meta.DatabaseProperty.GET_NUMERIC_FUNCTIONS + .getProp(connection.meta, connection.handle, String.class); + } + }); } public String getStringFunctions() throws SQLException { - return Meta.DatabaseProperty.GET_STRING_FUNCTIONS - .getProp(connection.meta, connection.handle, String.class); + return connection.invokeWithRetries( + new CallableWithoutException() { + public String call() { + return Meta.DatabaseProperty.GET_STRING_FUNCTIONS + .getProp(connection.meta, connection.handle, String.class); + } + }); } public String getSystemFunctions() throws SQLException { - return Meta.DatabaseProperty.GET_SYSTEM_FUNCTIONS - .getProp(connection.meta, connection.handle, String.class); + return connection.invokeWithRetries( + new CallableWithoutException() { + public String call() { + return Meta.DatabaseProperty.GET_SYSTEM_FUNCTIONS + .getProp(connection.meta, connection.handle, String.class); + } + }); } public String getTimeDateFunctions() throws SQLException { - return Meta.DatabaseProperty.GET_TIME_DATE_FUNCTIONS - .getProp(connection.meta, connection.handle, String.class); + return connection.invokeWithRetries( + new CallableWithoutException() { + public String call() { + return Meta.DatabaseProperty.GET_TIME_DATE_FUNCTIONS + .getProp(connection.meta, connection.handle, String.class); + } + }); } public String getSearchStringEscape() throws SQLException { @@ -234,8 +261,7 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData { return true; } - public boolean supportsConvert( - int fromType, int toType) throws SQLException { + public boolean supportsConvert(int fromType, int toType) throws SQLException { return false; // TODO: more detail } @@ -528,8 +554,13 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData { } public int getDefaultTransactionIsolation() throws SQLException { - return Meta.DatabaseProperty.GET_DEFAULT_TRANSACTION_ISOLATION - .getProp(connection.meta, connection.handle, Integer.class); + return connection.invokeWithRetries( + new CallableWithoutException() { + public Integer call() { + return Meta.DatabaseProperty.GET_DEFAULT_TRANSACTION_ISOLATION + .getProp(connection.meta, connection.handle, Integer.class); + } + }); } public boolean supportsTransactions() throws SQLException { @@ -560,33 +591,90 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData { } public ResultSet getProcedures( - String catalog, - String schemaPattern, - String procedureNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getProcedures(connection.handle, catalog, pat(schemaPattern), - pat(procedureNamePattern))); + final String catalog, + final String schemaPattern, + final String procedureNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getProcedures(connection.handle, catalog, pat(schemaPattern), + pat(procedureNamePattern)), + new QueryState(MetaDataOperation.GET_PROCEDURES, catalog, schemaPattern, + procedureNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getProcedureColumns( - String catalog, - String schemaPattern, - String procedureNamePattern, - String columnNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getProcedureColumns(connection.handle, catalog, pat(schemaPattern), - pat(procedureNamePattern), pat(columnNamePattern))); + final String catalog, + final String schemaPattern, + final String procedureNamePattern, + final String columnNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getProcedureColumns(connection.handle, catalog, + pat(schemaPattern), pat(procedureNamePattern), pat(columnNamePattern)), + new QueryState(MetaDataOperation.GET_PROCEDURE_COLUMNS, catalog, schemaPattern, + procedureNamePattern, columnNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getTables( - String catalog, + final String catalog, final String schemaPattern, - String tableNamePattern, - String[] types) throws SQLException { - List typeList = types == null ? null : Arrays.asList(types); - return connection.createResultSet( - connection.meta.getTables(connection.handle, catalog, pat(schemaPattern), - pat(tableNamePattern), typeList)); + final String tableNamePattern, + final String[] types) throws SQLException { + final List typeList = types == null ? null : Arrays.asList(types); + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getTables(connection.handle, catalog, pat(schemaPattern), + pat(tableNamePattern), typeList), + new QueryState(MetaDataOperation.GET_TABLES, catalog, schemaPattern, + tableNamePattern, types)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } private static Meta.Pat pat(String schemaPattern) { @@ -594,11 +682,30 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData { } public ResultSet getSchemas( - String catalog, String schemaPattern) throws SQLException { + final String catalog, final String schemaPattern) throws SQLException { // TODO: add a 'catch ... throw new SQLException' logic to this and other // getXxx methods. Right now any error will throw a RuntimeException - return connection.createResultSet( - connection.meta.getSchemas(connection.handle, catalog, pat(schemaPattern))); + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getSchemas(connection.handle, catalog, pat(schemaPattern)), + new QueryState(MetaDataOperation.GET_SCHEMAS_WITH_ARGS, catalog, + schemaPattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getSchemas() throws SQLException { @@ -606,103 +713,342 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData { } public ResultSet getCatalogs() throws SQLException { - return connection.createResultSet(connection.meta.getCatalogs(connection.handle)); + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet(connection.meta.getCatalogs(connection.handle), + new QueryState(MetaDataOperation.GET_CATALOGS)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getTableTypes() throws SQLException { - return connection.createResultSet(connection.meta.getTableTypes(connection.handle)); + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet(connection.meta.getTableTypes(connection.handle), + new QueryState(MetaDataOperation.GET_TABLE_TYPES)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getColumns( - String catalog, - String schemaPattern, - String tableNamePattern, - String columnNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getColumns(connection.handle, - catalog, pat(schemaPattern), - pat(tableNamePattern), pat(columnNamePattern))); + final String catalog, + final String schemaPattern, + final String tableNamePattern, + final String columnNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getColumns(connection.handle, catalog, pat(schemaPattern), + pat(tableNamePattern), pat(columnNamePattern)), + new QueryState(MetaDataOperation.GET_COLUMNS, catalog, schemaPattern, + tableNamePattern, columnNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getColumnPrivileges( - String catalog, - String schema, - String table, - String columnNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getColumnPrivileges(connection.handle, catalog, schema, table, - pat(columnNamePattern))); + final String catalog, + final String schema, + final String table, + final String columnNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getColumnPrivileges(connection.handle, catalog, schema, table, + pat(columnNamePattern)), + new QueryState(MetaDataOperation.GET_COLUMN_PRIVILEGES, catalog, schema, table, + columnNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getTablePrivileges( - String catalog, - String schemaPattern, - String tableNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getTablePrivileges(connection.handle, catalog, pat(schemaPattern), - pat(tableNamePattern))); + final String catalog, + final String schemaPattern, + final String tableNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getTablePrivileges(connection.handle, catalog, + pat(schemaPattern), pat(tableNamePattern)), + new QueryState(MetaDataOperation.GET_TABLE_PRIVILEGES, catalog, schemaPattern, + tableNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getBestRowIdentifier( - String catalog, - String schema, - String table, - int scope, - boolean nullable) throws SQLException { - return connection.createResultSet( - connection.meta.getBestRowIdentifier(connection.handle, catalog, schema, table, scope, - nullable)); + final String catalog, + final String schema, + final String table, + final int scope, + final boolean nullable) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getBestRowIdentifier(connection.handle, catalog, schema, table, + scope, nullable), + new QueryState(MetaDataOperation.GET_BEST_ROW_IDENTIFIER, catalog, table, scope, + nullable)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getVersionColumns( - String catalog, String schema, String table) throws SQLException { - return connection.createResultSet( - connection.meta.getVersionColumns(connection.handle, catalog, schema, table)); + final String catalog, final String schema, final String table) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getVersionColumns(connection.handle, catalog, schema, table), + new QueryState(MetaDataOperation.GET_VERSION_COLUMNS, catalog, schema, table)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getPrimaryKeys( - String catalog, String schema, String table) throws SQLException { - return connection.createResultSet( - connection.meta.getPrimaryKeys(connection.handle, catalog, schema, table)); + final String catalog, final String schema, final String table) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getPrimaryKeys(connection.handle, catalog, schema, table), + new QueryState(MetaDataOperation.GET_PRIMARY_KEYS, catalog, schema, table)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getImportedKeys( - String catalog, String schema, String table) throws SQLException { - return connection.createResultSet( - connection.meta.getImportedKeys(connection.handle, catalog, schema, table)); + final String catalog, final String schema, final String table) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getImportedKeys(connection.handle, catalog, schema, table), + new QueryState(MetaDataOperation.GET_IMPORTED_KEYS, catalog, schema, table)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getExportedKeys( - String catalog, String schema, String table) throws SQLException { - return connection.createResultSet( - connection.meta.getExportedKeys(connection.handle, catalog, schema, table)); + final String catalog, final String schema, final String table) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getExportedKeys(connection.handle, catalog, schema, table), + new QueryState(MetaDataOperation.GET_EXPORTED_KEYS, catalog, schema, table)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getCrossReference( - String parentCatalog, - String parentSchema, - String parentTable, - String foreignCatalog, - String foreignSchema, - String foreignTable) throws SQLException { - return connection.createResultSet( - connection.meta.getCrossReference(connection.handle, parentCatalog, parentSchema, - parentTable, foreignCatalog, foreignSchema, foreignTable)); + final String parentCatalog, + final String parentSchema, + final String parentTable, + final String foreignCatalog, + final String foreignSchema, + final String foreignTable) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getCrossReference(connection.handle, parentCatalog, + parentSchema, parentTable, foreignCatalog, foreignSchema, foreignTable), + new QueryState(MetaDataOperation.GET_CROSS_REFERENCE, parentCatalog, + parentSchema, parentTable, foreignCatalog, foreignSchema, foreignTable)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getTypeInfo() throws SQLException { - return connection.createResultSet(connection.meta.getTypeInfo(connection.handle)); + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet(connection.meta.getTypeInfo(connection.handle), + new QueryState(MetaDataOperation.GET_TYPE_INFO)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getIndexInfo( - String catalog, - String schema, - String table, - boolean unique, - boolean approximate) throws SQLException { - return connection.createResultSet( - connection.meta.getIndexInfo(connection.handle, catalog, schema, table, unique, - approximate)); + final String catalog, + final String schema, + final String table, + final boolean unique, + final boolean approximate) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getIndexInfo(connection.handle, catalog, schema, table, unique, + approximate), + new QueryState(MetaDataOperation.GET_INDEX_INFO, catalog, schema, table, unique, + approximate)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public boolean supportsResultSetType(int type) throws SQLException { @@ -756,13 +1102,32 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData { } public ResultSet getUDTs( - String catalog, - String schemaPattern, - String typeNamePattern, - int[] types) throws SQLException { - return connection.createResultSet( - connection.meta.getUDTs(connection.handle, catalog, pat(schemaPattern), - pat(typeNamePattern), types)); + final String catalog, + final String schemaPattern, + final String typeNamePattern, + final int[] types) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getUDTs(connection.handle, catalog, pat(schemaPattern), + pat(typeNamePattern), types), + new QueryState(MetaDataOperation.GET_UDTS, catalog, schemaPattern, + typeNamePattern, types)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public Connection getConnection() throws SQLException { @@ -786,31 +1151,88 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData { } public ResultSet getSuperTypes( - String catalog, - String schemaPattern, - String typeNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getSuperTypes(connection.handle, catalog, pat(schemaPattern), - pat(typeNamePattern))); + final String catalog, + final String schemaPattern, + final String typeNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getSuperTypes(connection.handle, catalog, pat(schemaPattern), + pat(typeNamePattern)), + new QueryState(MetaDataOperation.GET_SUPER_TYPES, catalog, schemaPattern, + typeNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getSuperTables( - String catalog, - String schemaPattern, - String tableNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getSuperTables(connection.handle, catalog, pat(schemaPattern), - pat(tableNamePattern))); + final String catalog, + final String schemaPattern, + final String tableNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getSuperTables(connection.handle, catalog, pat(schemaPattern), + pat(tableNamePattern)), + new QueryState(MetaDataOperation.GET_SUPER_TABLES, catalog, schemaPattern, + tableNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getAttributes( - String catalog, - String schemaPattern, - String typeNamePattern, - String attributeNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getAttributes(connection.handle, catalog, pat(schemaPattern), - pat(typeNamePattern), pat(attributeNamePattern))); + final String catalog, + final String schemaPattern, + final String typeNamePattern, + final String attributeNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getAttributes(connection.handle, catalog, pat(schemaPattern), + pat(typeNamePattern), pat(attributeNamePattern)), + new QueryState(MetaDataOperation.GET_ATTRIBUTES, catalog, schemaPattern, + typeNamePattern, attributeNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public boolean supportsResultSetHoldability(int holdability) @@ -864,37 +1286,112 @@ public class AvaticaDatabaseMetaData implements DatabaseMetaData { } public ResultSet getClientInfoProperties() throws SQLException { - return connection.createResultSet( - connection.meta.getClientInfoProperties(connection.handle)); + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getClientInfoProperties(connection.handle), + new QueryState(MetaDataOperation.GET_CLIENT_INFO_PROPERTIES)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getFunctions( - String catalog, - String schemaPattern, - String functionNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getFunctions(connection.handle, catalog, pat(schemaPattern), - pat(functionNamePattern))); + final String catalog, + final String schemaPattern, + final String functionNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getFunctions(connection.handle, catalog, pat(schemaPattern), + pat(functionNamePattern)), + new QueryState(MetaDataOperation.GET_FUNCTIONS, catalog, schemaPattern, + functionNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getFunctionColumns( - String catalog, - String schemaPattern, - String functionNamePattern, - String columnNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getFunctionColumns(connection.handle, catalog, pat(schemaPattern), - pat(functionNamePattern), pat(columnNamePattern))); + final String catalog, + final String schemaPattern, + final String functionNamePattern, + final String columnNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getFunctionColumns(connection.handle, catalog, + pat(schemaPattern), pat(functionNamePattern), pat(columnNamePattern)), + new QueryState(MetaDataOperation.GET_FUNCTION_COLUMNS, catalog, + schemaPattern, functionNamePattern, columnNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public ResultSet getPseudoColumns( - String catalog, - String schemaPattern, - String tableNamePattern, - String columnNamePattern) throws SQLException { - return connection.createResultSet( - connection.meta.getPseudoColumns(connection.handle, catalog, pat(schemaPattern), - pat(tableNamePattern), pat(columnNamePattern))); + final String catalog, + final String schemaPattern, + final String tableNamePattern, + final String columnNamePattern) throws SQLException { + try { + return connection.invokeWithRetries( + new CallableWithoutException() { + public ResultSet call() { + try { + return connection.createResultSet( + connection.meta.getPseudoColumns(connection.handle, catalog, pat(schemaPattern), + pat(tableNamePattern), pat(columnNamePattern)), + new QueryState(MetaDataOperation.GET_PSEUDO_COLUMNS, catalog, schemaPattern, + tableNamePattern, columnNamePattern)); + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + }); + } catch (RuntimeException e) { + Throwable cause = e.getCause(); + if (cause instanceof SQLException) { + throw (SQLException) cause; + } + throw e; + } } public boolean generatedKeyAlwaysReturned() throws SQLException { http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java ---------------------------------------------------------------------- diff --git a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java index c6d2ede..1a2a97f 100644 --- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java +++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaFactory.java @@ -51,13 +51,14 @@ public interface AvaticaFactory { * {@link AvaticaResultSet#execute()} on it. * * @param statement Statement + * @param state The state used to create this result set * @param signature Prepared statement * @param timeZone Time zone * @param firstFrame Frame containing the first (or perhaps only) rows in the * result, or null if an execute/fetch is required * @return Result set */ - AvaticaResultSet newResultSet(AvaticaStatement statement, + AvaticaResultSet newResultSet(AvaticaStatement statement, QueryState state, Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame) throws SQLException;