Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C3034200C81 for ; Fri, 26 May 2017 16:06:06 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id C1DD4160BDD; Fri, 26 May 2017 14:06:06 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D452B160BAF for ; Fri, 26 May 2017 16:06:03 +0200 (CEST) Received: (qmail 76582 invoked by uid 500); 26 May 2017 14:06:03 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 76519 invoked by uid 99); 26 May 2017 14:06:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 May 2017 14:06:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 88441E04AA; Fri, 26 May 2017 14:06:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 26 May 2017 14:06:03 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/39] ignite git commit: IGNITE-5163: Implemented infrastructure for the new JDBC driver. This closes #1912. archived-at: Fri, 26 May 2017 14:06:06 -0000 IGNITE-5163: Implemented infrastructure for the new JDBC driver. This closes #1912. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f1dc3ac Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f1dc3ac Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f1dc3ac Branch: refs/heads/ignite-5075-cc-debug Commit: 6f1dc3ac65d403a634331515cd1f279010d0d092 Parents: c04b39a Author: tledkov-gridgain Authored: Tue May 23 15:55:48 2017 +0300 Committer: devozerov Committed: Tue May 23 15:55:48 2017 +0300 ---------------------------------------------------------------------- .../jdbc/suite/IgniteJdbcDriverTestSuite.java | 3 + .../jdbc/thin/JdbcConnectionSelfTest.java | 195 +++++++ .../org/apache/ignite/IgniteJdbcThinDriver.java | 312 +++++++++++ .../ignite/internal/GridKernalContext.java | 8 +- .../ignite/internal/GridKernalContextImpl.java | 12 +- .../apache/ignite/internal/IgniteKernal.java | 4 +- .../internal/binary/BinaryWriterExImpl.java | 16 +- .../internal/jdbc/thin/JdbcConnection.java | 529 +++++++++++++++++++ .../ignite/internal/jdbc/thin/JdbcTcpIo.java | 207 ++++++++ .../processors/odbc/OdbcNioListener.java | 242 --------- .../internal/processors/odbc/OdbcProcessor.java | 199 ------- .../odbc/SqlListenerAbstractMessageParser.java | 265 ++++++++++ .../odbc/SqlListenerAbstractObjectReader.java | 137 +++++ .../odbc/SqlListenerAbstractObjectWriter.java | 111 ++++ .../processors/odbc/SqlListenerNioListener.java | 263 +++++++++ .../processors/odbc/SqlListenerProcessor.java | 191 +++++++ .../odbc/SqlListenerRequestHandlerImpl.java | 494 +++++++++++++++++ .../processors/odbc/jdbc/JdbcMessageParser.java | 50 ++ .../processors/odbc/jdbc/JdbcObjectReader.java | 33 ++ .../processors/odbc/jdbc/JdbcObjectWriter.java | 33 ++ .../processors/odbc/odbc/OdbcMessageParser.java | 249 +-------- .../processors/odbc/odbc/OdbcObjectReader.java | 33 ++ .../processors/odbc/odbc/OdbcObjectWriter.java | 32 ++ .../odbc/odbc/OdbcRequestHandler.java | 513 ------------------ .../odbc/OdbcProcessorValidationSelfTest.java | 182 ------- .../SqlListenerProcessorValidationSelfTest.java | 184 +++++++ .../ignite/testframework/GridTestUtils.java | 2 +- .../ignite/testsuites/IgniteBasicTestSuite.java | 4 +- .../cpp/odbc/include/ignite/odbc/message.h | 10 + 29 files changed, 3124 insertions(+), 1389 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java index 75671de..e2f09ba 100644 --- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java @@ -81,6 +81,9 @@ public class IgniteJdbcDriverTestSuite extends TestSuite { suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexTransactionalPartitionedSelfTest.class)); suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexTransactionalReplicatedSelfTest.class)); + // New thin JDBC + suite.addTest(new TestSuite(org.apache.ignite.jdbc.thin.JdbcConnectionSelfTest.class)); + return suite; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java new file mode 100644 index 0000000..d7e2bef --- /dev/null +++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.jdbc.thin; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.concurrent.Callable; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.OdbcConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; + +/** + * Connection test. + */ +public class JdbcConnectionSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** URL prefix. */ + private static final String URL_PREFIX = "jdbc:ignite:thin://"; + + /** Host. */ + private static final String HOST = "127.0.0.1"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setCacheConfiguration(cacheConfiguration(DEFAULT_CACHE_NAME)); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setMarshaller(new BinaryMarshaller()); + + cfg.setOdbcConfiguration(new OdbcConfiguration()); + + return cfg; + } + + /** + * @param name Cache name. + * @return Cache configuration. + * @throws Exception In case of error. + */ + private CacheConfiguration cacheConfiguration(@NotNull String name) throws Exception { + CacheConfiguration cfg = defaultCacheConfiguration(); + + cfg.setName(name); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + try { + Driver drv = DriverManager.getDriver("jdbc:ignite://"); + + if (drv != null) + DriverManager.deregisterDriver(drv); + } catch (SQLException ignored) { + // No-op. + } + + startGridsMultiThreaded(2); + + Class.forName("org.apache.ignite.IgniteJdbcThinDriver"); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testDefaults() throws Exception { + String url = URL_PREFIX + HOST; + + assert DriverManager.getConnection(url) != null; + assert DriverManager.getConnection(url + "/") != null; + } + + /** + * @throws Exception If failed. + */ + public void testInvalidUrls() throws Exception { + GridTestUtils.assertThrowsAnyCause(log, new Callable() { + @Override public Void call() throws Exception { + DriverManager.getConnection(URL_PREFIX + "127.0.0.1:80"); + + return null; + } + }, SQLException.class, "Failed to connect to Ignite cluster [host=127.0.0.1, port=80]"); + + GridTestUtils.assertThrowsAnyCause(log, new Callable() { + @Override public Void call() throws Exception { + DriverManager.getConnection("q"); + + return null; + } + }, SQLException.class, "URL is invalid"); + + GridTestUtils.assertThrowsAnyCause(log, new Callable() { + @Override public Void call() throws Exception { + DriverManager.getConnection(URL_PREFIX + "127.0.0.1:-1"); + + return null; + } + }, SQLException.class, "Invalid port:"); + + GridTestUtils.assertThrowsAnyCause(log, new Callable() { + @Override public Void call() throws Exception { + DriverManager.getConnection(URL_PREFIX + "127.0.0.1:0"); + + return null; + } + }, SQLException.class, "Invalid port:"); + + GridTestUtils.assertThrowsAnyCause(log, new Callable() { + @Override public Void call() throws Exception { + DriverManager.getConnection(URL_PREFIX + "127.0.0.1:100000"); + + return null; + } + }, SQLException.class, "Invalid port:"); + + GridTestUtils.assertThrowsAnyCause(log, new Callable() { + @Override public Void call() throws Exception { + DriverManager.getConnection(URL_PREFIX + " :10000"); + + return null; + } + }, SQLException.class, "Host name is empty"); + } + + /** + * @throws Exception If failed. + */ + public void testClose() throws Exception { + String url = URL_PREFIX + HOST; + + final Connection conn = DriverManager.getConnection(url); + + assert conn != null; + assert !conn.isClosed(); + + conn.close(); + + assert conn.isClosed(); + + assert !conn.isValid(2): "Connection must be closed"; + + GridTestUtils.assertThrows( + log, + new Callable() { + @Override public Object call() throws Exception { + conn.isValid(-2); + + return null; + } + }, + SQLException.class, + "Invalid timeout" + ); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java new file mode 100644 index 0000000..19e1edd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.logging.Logger; +import org.apache.ignite.cache.affinity.AffinityKey; +import org.apache.ignite.configuration.OdbcConfiguration; +import org.apache.ignite.internal.IgniteVersionUtils; +import org.apache.ignite.internal.jdbc.JdbcDriverPropertyInfo; +import org.apache.ignite.internal.jdbc.thin.JdbcConnection; + +/** + * JDBC driver thin implementation for In-Memory Data Grid. + *

+ * Driver allows to get distributed data from Ignite cache using standard + * SQL queries and standard JDBC API. It will automatically get only fields that + * you actually need from objects stored in cache. + *

Limitations

+ * Data in Ignite cache is usually distributed across several nodes, + * so some queries may not work as expected since the query will be sent to each + * individual node and results will then be collected and returned as JDBC result set. + * Keep in mind following limitations (not applied if data is queried from one node only, + * or data is fully co-located or fully replicated on multiple nodes): + *
    + *
  • + * Joins will work correctly only if joined objects are stored in + * collocated mode. Refer to + * {@link AffinityKey} + * javadoc for more details. + *
  • + *
  • + * Note that if you are connected to local or replicated cache, all data will + * be queried only on one node, not depending on what caches participate in + * the query (some data from partitioned cache can be lost). And visa versa, + * if you are connected to partitioned cache, data from replicated caches + * will be duplicated. + *
  • + *
+ *

SQL Notice

+ * Driver allows to query data from several caches. Cache that driver is connected to is + * treated as default schema in this case. Other caches can be referenced by their names.\ + * + *

Dependencies

+ * JDBC driver is located in main Ignite JAR in {@code IGNITE_HOME/libs} folder. + *

Configuration

+ * + *

+ * JDBC connection URL has the following pattern: + * {@code jdbc:ignite://:/}
+ * Note the following: + *

    + *
  • Hostname is required.
  • + *
  • If port is not defined, {@code 10800} is used (default for Ignite thin client).
  • + *
+ * Other properties can be defined in {@link Properties} object passed to + * {@link DriverManager#getConnection(String, Properties)} method: + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
NameDescriptionDefaultOptional
ignite.jdbc.distributedJoinsFlag to enable distributed joins.{@code false} (distributed joins are disabled)Yes
ignite.jdbc.enforceJoinOrderFlag to enforce join order of tables in the query.{@code false} (enforcing join order is disabled)Yes
+ *

Example

+ *
+ * // Register JDBC driver.
+ * Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
+ *
+ * // Open JDBC connection.
+ * Connection conn = DriverManager.getConnection("jdbc:ignite:thin//localhost:10800");
+ *
+ * // Query persons' names
+ * ResultSet rs = conn.createStatement().executeQuery("select name from Person");
+ *
+ * while (rs.next()) {
+ *     String name = rs.getString(1);
+ *
+ *     ...
+ * }
+ *
+ * // Query persons with specific age
+ * PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");
+ *
+ * stmt.setInt(1, 30);
+ *
+ * ResultSet rs = stmt.executeQuery();
+ *
+ * while (rs.next()) {
+ *     String name = rs.getString("name");
+ *     int age = rs.getInt("age");
+ *
+ *     ...
+ * }
+ * 
+ */ +@SuppressWarnings("JavadocReference") +public class IgniteJdbcThinDriver implements Driver { + /** Prefix for property names. */ + private static final String PROP_PREFIX = "ignite.jdbc"; + + /** Distributed joins parameter name. */ + private static final String PARAM_DISTRIBUTED_JOINS = "distributedJoins"; + + /** Enforce join order parameter name. */ + private static final String ENFORCE_JOIN_ORDER = "enforceJoinOrder"; + + /** Hostname property name. */ + public static final String PROP_HOST = PROP_PREFIX + "host"; + + /** Port number property name. */ + public static final String PROP_PORT = PROP_PREFIX + "port"; + + /** Distributed joins property name. */ + public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS; + + /** Transactions allowed property name. */ + public static final String PROP_ENFORCE_JOIN_ORDER = PROP_PREFIX + ENFORCE_JOIN_ORDER; + + /** URL prefix. */ + public static final String URL_PREFIX = "jdbc:ignite:thin://"; + + /** Default port. */ + public static final int DFLT_PORT = OdbcConfiguration.DFLT_TCP_PORT_FROM; + + /** Major version. */ + private static final int MAJOR_VER = IgniteVersionUtils.VER.major(); + + /** Minor version. */ + private static final int MINOR_VER = IgniteVersionUtils.VER.minor(); + + /* + * Register driver. + */ + static { + try { + DriverManager.registerDriver(new IgniteJdbcThinDriver()); + } + catch (SQLException e) { + throw new RuntimeException("Failed to register Ignite JDBC driver.", e); + } + } + + /** {@inheritDoc} */ + @Override public Connection connect(String url, Properties props) throws SQLException { + if (!parseUrl(url, props)) + throw new SQLException("URL is invalid: " + url); + + return new JdbcConnection(url, props); + } + + /** {@inheritDoc} */ + @Override public boolean acceptsURL(String url) throws SQLException { + return url.startsWith(URL_PREFIX); + } + + /** {@inheritDoc} */ + @Override public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException { + if (!parseUrl(url, info)) + throw new SQLException("URL is invalid: " + url); + + List props = Arrays.asList( + new JdbcDriverPropertyInfo("Hostname", info.getProperty(PROP_HOST), ""), + new JdbcDriverPropertyInfo("Port number", info.getProperty(PROP_PORT), ""), + new JdbcDriverPropertyInfo("Distributed Joins", info.getProperty(PROP_DISTRIBUTED_JOINS), ""), + new JdbcDriverPropertyInfo("Enforce Join Order", info.getProperty(PROP_ENFORCE_JOIN_ORDER), "") + ); + + return props.toArray(new DriverPropertyInfo[0]); + } + + /** {@inheritDoc} */ + @Override public int getMajorVersion() { + return MAJOR_VER; + } + + /** {@inheritDoc} */ + @Override public int getMinorVersion() { + return MINOR_VER; + } + + /** {@inheritDoc} */ + @Override public boolean jdbcCompliant() { + return false; + } + + /** {@inheritDoc} */ + @Override public Logger getParentLogger() throws SQLFeatureNotSupportedException { + throw new SQLFeatureNotSupportedException("java.util.logging is not used."); + } + + /** + * Validates and parses connection URL. + * + * @param props Properties. + * @param url URL. + * @return Whether URL is valid. + */ + private boolean parseUrl(String url, Properties props) { + if (url == null) + return false; + + if (url.startsWith(URL_PREFIX) && url.length() > URL_PREFIX.length()) + return parseJdbcUrl(url, props); + + return false; + } + + /** + * @param url Url. + * @param props Properties. + * @return Whether URL is valid. + */ + private boolean parseJdbcUrl(String url, Properties props) { + url = url.substring(URL_PREFIX.length()); + + String[] parts = url.split("\\?"); + + if (parts.length > 2) + return false; + + if (parts.length == 2) + if (!parseParameters(parts[1], "&", props)) + return false; + + parts = parts[0].split("/"); + + assert parts.length > 0; + + if (parts.length > 1) + return false; + + url = parts[0]; + + parts = url.split(":"); + + assert parts.length > 0; + + if (parts.length > 2) + return false; + + props.setProperty(PROP_HOST, parts[0]); + + try { + props.setProperty(PROP_PORT, String.valueOf(parts.length == 2 ? Integer.valueOf(parts[1]) : DFLT_PORT)); + } + catch (NumberFormatException ignored) { + return false; + } + + return true; + } + + /** + * Validates and parses URL parameters. + * + * @param val Parameters string. + * @param delim Delimiter. + * @param props Properties. + * @return Whether URL parameters string is valid. + */ + private boolean parseParameters(String val, String delim, Properties props) { + String[] params = val.split(delim); + + for (String param : params) { + String[] pair = param.split("="); + + if (pair.length != 2 || pair[0].isEmpty() || pair[1].isEmpty()) + return false; + + props.setProperty(PROP_PREFIX + pair[0], pair[1]); + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 010bd21..7a01200 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -48,7 +48,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; -import org.apache.ignite.internal.processors.odbc.OdbcProcessor; +import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.pool.PoolProcessor; @@ -330,11 +330,11 @@ public interface GridKernalContext extends Iterable { public GridQueryProcessor query(); /** - * Gets ODBC processor. + * Gets SQL listener processor. * - * @return ODBC processor. + * @return SQL listener processor. */ - public OdbcProcessor odbc(); + public SqlListenerProcessor sqlListener(); /** * @return Plugin processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index bbc9846..262c5eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -64,7 +64,7 @@ import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor; -import org.apache.ignite.internal.processors.odbc.OdbcProcessor; +import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; @@ -160,7 +160,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringInclude - private OdbcProcessor odbcProc; + private SqlListenerProcessor sqlListenerProc; /** */ @GridToStringInclude @@ -567,8 +567,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable pluginProc = (IgnitePluginProcessor)comp; else if (comp instanceof GridQueryProcessor) qryProc = (GridQueryProcessor)comp; - else if (comp instanceof OdbcProcessor) - odbcProc = (OdbcProcessor)comp; + else if (comp instanceof SqlListenerProcessor) + sqlListenerProc = (SqlListenerProcessor)comp; else if (comp instanceof DataStructuresProcessor) dataStructuresProc = (DataStructuresProcessor)comp; else if (comp instanceof ClusterProcessor) @@ -824,8 +824,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public OdbcProcessor odbc() { - return odbcProc; + @Override public SqlListenerProcessor sqlListener() { + return sqlListenerProc; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 40476a7..c36fd7e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -125,7 +125,7 @@ import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor; import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor; -import org.apache.ignite.internal.processors.odbc.OdbcProcessor; +import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor; import org.apache.ignite.internal.processors.platform.PlatformNoopProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor; @@ -924,7 +924,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx)); startProcessor(new GridCacheProcessor(ctx));startProcessor(new GridClusterStateProcessor(ctx)); startProcessor(new GridQueryProcessor(ctx)); - startProcessor(new OdbcProcessor(ctx)); + startProcessor(new SqlListenerProcessor(ctx)); startProcessor(new GridServiceProcessor(ctx)); startProcessor(new GridTaskSessionProcessor(ctx)); startProcessor(new GridJobProcessor(ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java index 7b5e9d3..7efe4b3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java @@ -938,7 +938,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje /** * @param val Value. */ - void writeByteFieldPrimitive(byte val) { + public void writeByteFieldPrimitive(byte val) { out.unsafeEnsure(1 + 1); out.unsafeWriteByte(GridBinaryMarshaller.BYTE); @@ -965,7 +965,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje /** * @param val Value. */ - void writeShortFieldPrimitive(short val) { + public void writeShortFieldPrimitive(short val) { out.unsafeEnsure(1 + 2); out.unsafeWriteByte(GridBinaryMarshaller.SHORT); @@ -985,7 +985,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje /** * @param val Value. */ - void writeIntFieldPrimitive(int val) { + public void writeIntFieldPrimitive(int val) { out.unsafeEnsure(1 + 4); out.unsafeWriteByte(GridBinaryMarshaller.INT); @@ -1005,7 +1005,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje /** * @param val Value. */ - void writeLongFieldPrimitive(long val) { + public void writeLongFieldPrimitive(long val) { out.unsafeEnsure(1 + 8); out.unsafeWriteByte(GridBinaryMarshaller.LONG); @@ -1025,7 +1025,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje /** * @param val Value. */ - void writeFloatFieldPrimitive(float val) { + public void writeFloatFieldPrimitive(float val) { out.unsafeEnsure(1 + 4); out.unsafeWriteByte(GridBinaryMarshaller.FLOAT); @@ -1045,7 +1045,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje /** * @param val Value. */ - void writeDoubleFieldPrimitive(double val) { + public void writeDoubleFieldPrimitive(double val) { out.unsafeEnsure(1 + 8); out.unsafeWriteByte(GridBinaryMarshaller.DOUBLE); @@ -1065,7 +1065,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje /** * @param val Value. */ - void writeCharFieldPrimitive(char val) { + public void writeCharFieldPrimitive(char val) { out.unsafeEnsure(1 + 2); out.unsafeWriteByte(GridBinaryMarshaller.CHAR); @@ -1085,7 +1085,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje /** * @param val Value. */ - void writeBooleanFieldPrimitive(boolean val) { + public void writeBooleanFieldPrimitive(boolean val) { out.unsafeEnsure(1 + 1); out.unsafeWriteByte(GridBinaryMarshaller.BOOLEAN); http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java new file mode 100644 index 0000000..25d62b4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc.thin; + +import java.sql.Array; +import java.sql.Blob; +import java.sql.CallableStatement; +import java.sql.Clob; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.NClob; +import java.sql.PreparedStatement; +import java.sql.SQLClientInfoException; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Savepoint; +import java.sql.Statement; +import java.sql.Struct; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executor; +import java.util.logging.Logger; + +import static java.sql.ResultSet.CONCUR_READ_ONLY; +import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT; +import static java.sql.ResultSet.TYPE_FORWARD_ONLY; +import static org.apache.ignite.IgniteJdbcThinDriver.PROP_DISTRIBUTED_JOINS; +import static org.apache.ignite.IgniteJdbcThinDriver.PROP_ENFORCE_JOIN_ORDER; +import static org.apache.ignite.IgniteJdbcThinDriver.PROP_HOST; +import static org.apache.ignite.IgniteJdbcThinDriver.PROP_PORT; + +/** + * JDBC connection implementation. + * + * See documentation of {@link org.apache.ignite.IgniteJdbcThinDriver} for details. + */ +public class JdbcConnection implements Connection { + /** Logger. */ + private static final Logger LOG = Logger.getLogger(JdbcConnection.class.getName()); + + /** Cache name. */ + private String schemaName; + + /** Closed flag. */ + private boolean closed; + + /** Current transaction isolation. */ + private int txIsolation; + + /** Auto commit flag. */ + private boolean autoCommit; + + /** Current transaction holdability. */ + private int holdability; + + /** Timeout. */ + private int timeout; + + /** Ignite endpoint. */ + private JdbcTcpIo cliIo; + + /** + * Creates new connection. + * + * @param url Connection URL. + * @param props Additional properties. + * @throws SQLException In case Ignite client failed to start. + */ + public JdbcConnection(String url, Properties props) throws SQLException { + assert url != null; + assert props != null; + + holdability = HOLD_CURSORS_OVER_COMMIT; + autoCommit = true; + txIsolation = Connection.TRANSACTION_NONE; + + boolean distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS, "false")); + boolean enforceJoinOrder = Boolean.parseBoolean(props.getProperty(PROP_ENFORCE_JOIN_ORDER, "false")); + + String host = props.getProperty(PROP_HOST); + String portStr = props.getProperty(PROP_PORT); + + try { + int port = Integer.parseInt(portStr); + + if (port <= 0 || port > 0xFFFF) + throw new SQLException("Invalid port: " + portStr); + } + catch (NumberFormatException e) { + throw new SQLException("Invalid port: " + portStr, e); + } + + if (host == null || host.trim().isEmpty()) + throw new SQLException("Host name is empty."); + + String endpoint = host.trim() + ":" + portStr.trim(); + + try { + cliIo = new JdbcTcpIo(endpoint, distributedJoins, enforceJoinOrder); + + cliIo.start(); + } + catch (Exception e) { + cliIo.close(); + + throw new SQLException("Failed to connect to Ignite cluster [host=" + host + ", port=" + portStr + ']', e); + } + } + + /** {@inheritDoc} */ + @Override public Statement createStatement() throws SQLException { + return createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT); + } + + /** {@inheritDoc} */ + @Override public Statement createStatement(int resSetType, int resSetConcurrency) throws SQLException { + return createStatement(resSetType, resSetConcurrency, HOLD_CURSORS_OVER_COMMIT); + } + + /** {@inheritDoc} */ + @Override public Statement createStatement(int resSetType, int resSetConcurrency, + int resSetHoldability) throws SQLException { + ensureNotClosed(); + + checkCursorOptions(resSetType, resSetConcurrency, resSetHoldability); + + return null; + } + + /** {@inheritDoc} */ + @Override public PreparedStatement prepareStatement(String sql) throws SQLException { + return prepareStatement(sql, TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement prepareStatement(String sql, int resSetType, + int resSetConcurrency) throws SQLException { + return prepareStatement(sql, resSetType, resSetConcurrency, HOLD_CURSORS_OVER_COMMIT); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement prepareStatement(String sql, int resSetType, int resSetConcurrency, + int resSetHoldability) throws SQLException { + ensureNotClosed(); + + checkCursorOptions(resSetType, resSetConcurrency, resSetHoldability); + + return null; + } + + /** + * @param resSetType Cursor option. + * @param resSetConcurrency Cursor option. + * @param resSetHoldability Cursor option. + * @throws SQLException If options unsupported. + */ + private void checkCursorOptions(int resSetType, int resSetConcurrency, + int resSetHoldability) throws SQLException { + if (resSetType != TYPE_FORWARD_ONLY) + throw new SQLFeatureNotSupportedException("Invalid result set type (only forward is supported.)"); + + if (resSetConcurrency != CONCUR_READ_ONLY) + throw new SQLFeatureNotSupportedException("Invalid concurrency (updates are not supported)."); + + if (resSetHoldability != HOLD_CURSORS_OVER_COMMIT) + LOG.warning("Transactions are not supported."); + } + + /** {@inheritDoc} */ + @Override public CallableStatement prepareCall(String sql) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Callable functions are not supported."); + } + + /** {@inheritDoc} */ + @Override public CallableStatement prepareCall(String sql, int resSetType, int resSetConcurrency) + throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Callable functions are not supported."); + } + + /** {@inheritDoc} */ + @Override public String nativeSQL(String sql) throws SQLException { + ensureNotClosed(); + + return sql; + } + + /** {@inheritDoc} */ + @Override public void setAutoCommit(boolean autoCommit) throws SQLException { + ensureNotClosed(); + + this.autoCommit = autoCommit; + + if (!autoCommit) + LOG.warning("Transactions are not supported."); + } + + /** {@inheritDoc} */ + @Override public boolean getAutoCommit() throws SQLException { + ensureNotClosed(); + + if (!autoCommit) + LOG.warning("Transactions are not supported."); + + return autoCommit; + } + + /** {@inheritDoc} */ + @Override public void commit() throws SQLException { + ensureNotClosed(); + + LOG.warning("Transactions are not supported."); + } + + /** {@inheritDoc} */ + @Override public void rollback() throws SQLException { + ensureNotClosed(); + + LOG.warning("Transactions are not supported."); + } + + /** {@inheritDoc} */ + @Override public void close() throws SQLException { + if (closed) + return; + + closed = true; + + cliIo.close(); + } + + /** {@inheritDoc} */ + @Override public boolean isClosed() throws SQLException { + return closed; + } + + /** {@inheritDoc} */ + @Override public DatabaseMetaData getMetaData() throws SQLException { + ensureNotClosed(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void setReadOnly(boolean readOnly) throws SQLException { + ensureNotClosed(); + } + + /** {@inheritDoc} */ + @Override public boolean isReadOnly() throws SQLException { + ensureNotClosed(); + + return true; + } + + /** {@inheritDoc} */ + @Override public void setCatalog(String catalog) throws SQLException { + ensureNotClosed(); + } + + /** {@inheritDoc} */ + @Override public String getCatalog() throws SQLException { + ensureNotClosed(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void setTransactionIsolation(int level) throws SQLException { + ensureNotClosed(); + + LOG.warning("Transactions are not supported."); + + txIsolation = level; + } + + /** {@inheritDoc} */ + @Override public int getTransactionIsolation() throws SQLException { + ensureNotClosed(); + + LOG.warning("Transactions are not supported."); + + return txIsolation; + } + + /** {@inheritDoc} */ + @Override public SQLWarning getWarnings() throws SQLException { + ensureNotClosed(); + + return null; + } + + /** {@inheritDoc} */ + @Override public void clearWarnings() throws SQLException { + ensureNotClosed(); + } + + /** {@inheritDoc} */ + @Override public Map> getTypeMap() throws SQLException { + throw new SQLFeatureNotSupportedException("Types mapping is not supported."); + } + + /** {@inheritDoc} */ + @Override public void setTypeMap(Map> map) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Types mapping is not supported."); + } + + /** {@inheritDoc} */ + @Override public void setHoldability(int holdability) throws SQLException { + ensureNotClosed(); + + if (holdability != HOLD_CURSORS_OVER_COMMIT) + LOG.warning("Transactions are not supported."); + + this.holdability = holdability; + } + + /** {@inheritDoc} */ + @Override public int getHoldability() throws SQLException { + ensureNotClosed(); + + return holdability; + } + + /** {@inheritDoc} */ + @Override public Savepoint setSavepoint() throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Savepoints are not supported."); + } + + /** {@inheritDoc} */ + @Override public Savepoint setSavepoint(String name) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Savepoints are not supported."); + } + + /** {@inheritDoc} */ + @Override public void rollback(Savepoint savepoint) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Savepoints are not supported."); + } + + /** {@inheritDoc} */ + @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Savepoints are not supported."); + } + + /** {@inheritDoc} */ + @Override public CallableStatement prepareCall(String sql, int resSetType, int resSetConcurrency, + int resSetHoldability) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Callable functions are not supported."); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Auto generated keys are not supported."); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement prepareStatement(String sql, int[] colIndexes) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Auto generated keys are not supported."); + } + + /** {@inheritDoc} */ + @Override public PreparedStatement prepareStatement(String sql, String[] colNames) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("Auto generated keys are not supported."); + } + + /** {@inheritDoc} */ + @Override public Clob createClob() throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); + } + + /** {@inheritDoc} */ + @Override public Blob createBlob() throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); + } + + /** {@inheritDoc} */ + @Override public NClob createNClob() throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); + } + + /** {@inheritDoc} */ + @Override public SQLXML createSQLXML() throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); + } + + /** {@inheritDoc} */ + @Override public boolean isValid(int timeout) throws SQLException { + if (timeout < 0) + throw new SQLException("Invalid timeout: " + timeout); + + return !closed; + } + + /** {@inheritDoc} */ + @Override public void setClientInfo(String name, String val) throws SQLClientInfoException { + throw new UnsupportedOperationException("Client info is not supported."); + } + + /** {@inheritDoc} */ + @Override public void setClientInfo(Properties props) throws SQLClientInfoException { + throw new UnsupportedOperationException("Client info is not supported."); + } + + /** {@inheritDoc} */ + @Override public String getClientInfo(String name) throws SQLException { + ensureNotClosed(); + + return null; + } + + /** {@inheritDoc} */ + @Override public Properties getClientInfo() throws SQLException { + ensureNotClosed(); + + return new Properties(); + } + + /** {@inheritDoc} */ + @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); + } + + /** {@inheritDoc} */ + @Override public Struct createStruct(String typeName, Object[] attrs) throws SQLException { + ensureNotClosed(); + + throw new SQLFeatureNotSupportedException("SQL-specific types are not supported."); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public T unwrap(Class iface) throws SQLException { + if (!isWrapperFor(iface)) + throw new SQLException("Connection is not a wrapper for " + iface.getName()); + + return (T)this; + } + + /** {@inheritDoc} */ + @Override public boolean isWrapperFor(Class iface) throws SQLException { + return iface != null && iface == Connection.class; + } + + /** {@inheritDoc} */ + @Override public void setSchema(String schema) throws SQLException { + schemaName = schema; + } + + /** {@inheritDoc} */ + @Override public String getSchema() throws SQLException { + return schemaName; + } + + /** {@inheritDoc} */ + @Override public void abort(Executor executor) throws SQLException { + close(); + } + + /** {@inheritDoc} */ + @Override public void setNetworkTimeout(Executor executor, int ms) throws SQLException { + if (ms < 0) + throw new IllegalArgumentException("Timeout is below zero: " + ms); + + timeout = ms; + } + + /** {@inheritDoc} */ + @Override public int getNetworkTimeout() throws SQLException { + return timeout; + } + + /** + * Ensures that connection is not closed. + * + * @throws SQLException If connection is closed. + */ + private void ensureNotClosed() throws SQLException { + if (closed) + throw new SQLException("Connection is closed."); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java new file mode 100644 index 0000000..4946b41 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.jdbc.thin; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.util.logging.Logger; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; +import org.apache.ignite.internal.processors.odbc.SqlListenerProtocolVersion; +import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; +import org.apache.ignite.internal.processors.odbc.SqlListenerNioListener; +import org.apache.ignite.internal.util.ipc.IpcEndpoint; +import org.apache.ignite.internal.util.ipc.IpcEndpointFactory; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * JDBC IO layer implementation based on blocking IPC streams. + */ +public class JdbcTcpIo { + /** Current version. */ + private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2, 1, 0); + + /** Initial output stream capacity. */ + private static final int HANDSHAKE_MSG_SIZE = 10; + + /** Logger. */ + private static final Logger log = Logger.getLogger(JdbcTcpIo.class.getName()); + + /** Server endpoint address. */ + private final String endpointAddr; + + /** Endpoint. */ + private IpcEndpoint endpoint; + + /** Output stream. */ + private BufferedOutputStream out; + + /** Input stream. */ + private BufferedInputStream in; + + /** Distributed joins. */ + private boolean distributedJoins; + + /** Enforce join order. */ + private boolean enforceJoinOrder; + + /** Closed flag. */ + private boolean closed; + + /** + * @param endpointAddr Endpoint. + * @param distributedJoins Distributed joins flag. + * @param enforceJoinOrder Enforce join order flag. + */ + JdbcTcpIo(String endpointAddr, boolean distributedJoins, boolean enforceJoinOrder) { + assert endpointAddr != null; + + this.endpointAddr = endpointAddr; + this.distributedJoins = distributedJoins; + this.enforceJoinOrder= enforceJoinOrder; + } + + /** + * @throws IgniteCheckedException On error. + * @throws IOException On IO error in handshake. + */ + public void start() throws IgniteCheckedException, IOException { + endpoint = IpcEndpointFactory.connectEndpoint(endpointAddr, null); + + out = new BufferedOutputStream(endpoint.outputStream()); + in = new BufferedInputStream(endpoint.inputStream()); + + handshake(); + } + + /** + * @throws IOException On error. + * @throws IgniteCheckedException On error. + */ + public void handshake() throws IOException, IgniteCheckedException { + BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(HANDSHAKE_MSG_SIZE), + null, null); + + writer.writeByte((byte)SqlListenerRequest.HANDSHAKE); + + writer.writeShort(CURRENT_VER.major()); + writer.writeShort(CURRENT_VER.minor()); + writer.writeShort(CURRENT_VER.maintenance()); + + writer.writeByte(SqlListenerNioListener.JDBC_CLIENT); + + writer.writeBoolean(distributedJoins); + writer.writeBoolean(enforceJoinOrder); + + send(writer.array()); + + BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()), + null, null, false); + + boolean accepted = reader.readBoolean(); + + if (accepted) + return; + + short maj = reader.readShort(); + short min = reader.readShort(); + short maintenance = reader.readShort(); + + String err = reader.readString(); + + SqlListenerProtocolVersion ver = SqlListenerProtocolVersion.create(maj, min, maintenance); + + throw new IgniteCheckedException("Handshake failed [driverProtocolVer=" + CURRENT_VER + + ", remoteNodeProtocolVer=" + ver + ", err=" + err + ']'); + } + + /** + * @param req ODBC request. + * @throws IOException On error. + */ + private void send(byte[] req) throws IOException { + int size = req.length; + + out.write(size & 0xFF); + out.write((size >> 8) & 0xFF); + out.write((size >> 16) & 0xFF); + out.write((size >> 24) & 0xFF); + + out.write(req); + + out.flush(); + } + + /** + * @return Bytes of a response from server. + * @throws IOException On error. + * @throws IgniteCheckedException On error. + */ + private byte[] read() throws IOException, IgniteCheckedException { + byte[] sizeBytes = read(4); + + int msgSize = (((0xFF & sizeBytes[3]) << 24) | ((0xFF & sizeBytes[2]) << 16) + | ((0xFF & sizeBytes[1]) << 8) + (0xFF & sizeBytes[0])); + + return read(msgSize); + } + + /** + * @param size Count of bytes to read from stream. + * @return Read bytes. + * @throws IOException On error. + * @throws IgniteCheckedException On error. + */ + private byte [] read(int size) throws IOException, IgniteCheckedException { + int off = 0; + + byte[] data = new byte[size]; + + while (off != size) { + int res = in.read(data, off, size - off); + + if (res == -1) + throw new IgniteCheckedException("Failed to read incoming message (not enough data)."); + + off += res; + } + + return data; + } + + /** + * Close the client IO. + */ + public void close() { + if (closed) + return; + + // Clean up resources. + U.closeQuiet(out); + U.closeQuiet(in); + + if (endpoint != null) + endpoint.close(); + + closed = true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java deleted file mode 100644 index cdb3de3..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.odbc; - -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.binary.BinaryReaderExImpl; -import org.apache.ignite.internal.binary.BinaryWriterExImpl; -import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; -import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; -import org.apache.ignite.internal.binary.streams.BinaryInputStream; -import org.apache.ignite.internal.processors.odbc.odbc.OdbcMessageParser; -import org.apache.ignite.internal.processors.odbc.odbc.OdbcRequestHandler; -import org.apache.ignite.internal.util.GridSpinBusyLock; -import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; -import org.apache.ignite.internal.util.nio.GridNioSession; -import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; -import org.jetbrains.annotations.Nullable; - -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; - -/** - * ODBC message listener. - */ -public class OdbcNioListener extends GridNioServerListenerAdapter { - /** Current version. */ - private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2, 1, 0); - - /** Supported versions. */ - private static final Set SUPPORTED_VERS = new HashSet<>(); - - /** Connection-related metadata key. */ - private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); - - /** Request ID generator. */ - private static final AtomicLong REQ_ID_GEN = new AtomicLong(); - - /** Busy lock. */ - private final GridSpinBusyLock busyLock; - - /** Kernal context. */ - private final GridKernalContext ctx; - - /** Maximum allowed cursors. */ - private final int maxCursors; - - /** Logger. */ - private final IgniteLogger log; - - static { - SUPPORTED_VERS.add(CURRENT_VER); - } - - /** - * Constructor. - * - * @param ctx Context. - * @param busyLock Shutdown busy lock. - * @param maxCursors Maximum allowed cursors. - */ - public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors) { - this.ctx = ctx; - this.busyLock = busyLock; - this.maxCursors = maxCursors; - - log = ctx.log(getClass()); - } - - /** {@inheritDoc} */ - @Override public void onConnected(GridNioSession ses) { - if (log.isDebugEnabled()) - log.debug("SQL client connected: " + ses.remoteAddress()); - } - - /** {@inheritDoc} */ - @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - if (log.isDebugEnabled()) { - if (e == null) - log.debug("SQL client disconnected: " + ses.remoteAddress()); - else - log.debug("SQL client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']'); - } - } - - /** {@inheritDoc} */ - @Override public void onMessage(GridNioSession ses, byte[] msg) { - assert msg != null; - - SqlListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY); - - if (connCtx == null) { - onHandshake(ses, msg); - - return; - } - - SqlListenerMessageParser parser = connCtx.parser(); - - SqlListenerRequest req; - - try { - req = parser.decode(msg); - } - catch (Exception e) { - log.error("Failed to parse SQL client request [err=" + e + ']'); - - ses.close(); - - return; - } - - assert req != null; - - req.requestId(REQ_ID_GEN.incrementAndGet()); - - try { - long startTime = 0; - - if (log.isDebugEnabled()) { - startTime = System.nanoTime(); - - log.debug("SQL client request received [reqId=" + req.requestId() + ", addr=" + ses.remoteAddress() + - ", req=" + req + ']'); - } - - SqlListenerRequestHandler handler = connCtx.handler(); - - SqlListenerResponse resp = handler.handle(req); - - if (log.isDebugEnabled()) { - long dur = (System.nanoTime() - startTime) / 1000; - - log.debug("SQL client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur + - ", resp=" + resp.status() + ']'); - } - - byte[] outMsg = parser.encode(resp); - - ses.send(outMsg); - } - catch (Exception e) { - log.error("Failed to process SQL client request [reqId=" + req.requestId() + ", err=" + e + ']'); - - ses.send(parser.encode(new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString()))); - } - } - - /** - * Perform handshake. - * - * @param ses Session. - * @param msg Message bytes. - */ - private void onHandshake(GridNioSession ses, byte[] msg) { - BinaryInputStream stream = new BinaryHeapInputStream(msg); - - BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null, true); - - byte cmd = reader.readByte(); - - if (cmd != SqlListenerRequest.HANDSHAKE) { - log.error("Unexpected SQL client request (will close session): " + ses.remoteAddress()); - - ses.close(); - - return; - } - - short verMajor = reader.readShort(); - short verMinor = reader.readShort(); - short verMaintenance = reader.readShort(); - - SqlListenerProtocolVersion ver = SqlListenerProtocolVersion.create(verMajor, verMinor, verMaintenance); - - String errMsg = null; - - if (SUPPORTED_VERS.contains(ver)) { - // Prepare context. - SqlListenerConnectionContext connCtx = prepareContext(ver, reader); - - ses.addMeta(CONN_CTX_META_KEY, connCtx); - } - else { - log.warning("Unsupported version: " + ver.toString()); - - errMsg = "Unsupported version."; - } - - // Send response. - BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(8), null, null); - - if (errMsg == null) { - writer.writeBoolean(true); - } - else { - writer.writeBoolean(false); - writer.writeShort(CURRENT_VER.major()); - writer.writeShort(CURRENT_VER.minor()); - writer.writeShort(CURRENT_VER.maintenance()); - writer.doWriteString(errMsg); - } - - ses.send(writer.array()); - } - - /** - * Prepare context. - * - * @param ver Version. - * @param reader Reader. - * @return Context. - */ - private SqlListenerConnectionContext prepareContext(SqlListenerProtocolVersion ver, BinaryReaderExImpl reader) { - // TODO: Switch between ODBC and JDBC. - boolean distributedJoins = reader.readBoolean(); - boolean enforceJoinOrder = reader.readBoolean(); - - OdbcRequestHandler handler = - new OdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder); - - OdbcMessageParser parser = new OdbcMessageParser(ctx); - - return new SqlListenerConnectionContext(handler, parser); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java deleted file mode 100644 index 6b8b5a3..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.odbc; - -import java.net.InetAddress; -import java.nio.ByteOrder; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.LinkedBlockingQueue; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.OdbcConfiguration; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.binary.BinaryMarshaller; -import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.apache.ignite.internal.util.GridSpinBusyLock; -import org.apache.ignite.internal.util.HostAndPortRange; -import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter; -import org.apache.ignite.internal.util.nio.GridNioCodecFilter; -import org.apache.ignite.internal.util.nio.GridNioFilter; -import org.apache.ignite.internal.util.nio.GridNioServer; -import org.apache.ignite.internal.util.nio.GridNioSession; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.marshaller.Marshaller; -import org.apache.ignite.spi.IgnitePortProtocol; -import org.apache.ignite.thread.IgniteThreadPoolExecutor; - -/** - * ODBC processor. - */ -public class OdbcProcessor extends GridProcessorAdapter { - /** Default number of selectors. */ - private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); - - /** Default TCP_NODELAY flag. */ - private static final boolean DFLT_TCP_NODELAY = true; - - /** Default TCP direct buffer flag. */ - private static final boolean DFLT_TCP_DIRECT_BUF = false; - - /** Busy lock. */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); - - /** ODBC TCP Server. */ - private GridNioServer srv; - - /** ODBC executor service. */ - private ExecutorService odbcExecSvc; - - /** - * @param ctx Kernal context. - */ - public OdbcProcessor(GridKernalContext ctx) { - super(ctx); - } - - /** {@inheritDoc} */ - @Override public void start(boolean activeOnStart) throws IgniteCheckedException { - IgniteConfiguration cfg = ctx.config(); - - OdbcConfiguration odbcCfg = cfg.getOdbcConfiguration(); - - if (odbcCfg != null) { - try { - Marshaller marsh = cfg.getMarshaller(); - - if (marsh != null && !(marsh instanceof BinaryMarshaller)) - throw new IgniteCheckedException("ODBC can only be used with BinaryMarshaller (please set it " + - "through IgniteConfiguration.setMarshaller())"); - - HostAndPortRange hostPort; - - if (F.isEmpty(odbcCfg.getEndpointAddress())) { - hostPort = new HostAndPortRange(OdbcConfiguration.DFLT_TCP_HOST, - OdbcConfiguration.DFLT_TCP_PORT_FROM, - OdbcConfiguration.DFLT_TCP_PORT_TO - ); - } - else { - hostPort = HostAndPortRange.parse(odbcCfg.getEndpointAddress(), - OdbcConfiguration.DFLT_TCP_PORT_FROM, - OdbcConfiguration.DFLT_TCP_PORT_TO, - "Failed to parse ODBC endpoint address" - ); - } - - assertParameter(odbcCfg.getThreadPoolSize() > 0, "threadPoolSize > 0"); - - odbcExecSvc = new IgniteThreadPoolExecutor( - "odbc", - cfg.getIgniteInstanceName(), - odbcCfg.getThreadPoolSize(), - odbcCfg.getThreadPoolSize(), - 0, - new LinkedBlockingQueue()); - - InetAddress host; - - try { - host = InetAddress.getByName(hostPort.host()); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to resolve ODBC host: " + hostPort.host(), e); - } - - Exception lastErr = null; - - for (int port = hostPort.portFrom(); port <= hostPort.portTo(); port++) { - try { - GridNioFilter[] filters = new GridNioFilter[] { - new GridNioAsyncNotifyFilter(ctx.igniteInstanceName(), odbcExecSvc, log) { - @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException { - proceedSessionOpened(ses); - } - }, - new GridNioCodecFilter(new SqlListenerBufferedParser(), log, false) - }; - - GridNioServer srv0 = GridNioServer.builder() - .address(host) - .port(port) - .listener(new OdbcNioListener(ctx, busyLock, odbcCfg.getMaxOpenCursors())) - .logger(log) - .selectorCount(DFLT_SELECTOR_CNT) - .igniteInstanceName(ctx.igniteInstanceName()) - .serverName("odbc") - .tcpNoDelay(DFLT_TCP_NODELAY) - .directBuffer(DFLT_TCP_DIRECT_BUF) - .byteOrder(ByteOrder.nativeOrder()) - .socketSendBufferSize(odbcCfg.getSocketSendBufferSize()) - .socketReceiveBufferSize(odbcCfg.getSocketReceiveBufferSize()) - .filters(filters) - .directMode(false) - .build(); - - srv0.start(); - - srv = srv0; - - ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass()); - - log.info("ODBC processor has started on TCP port " + port); - - lastErr = null; - - break; - } - catch (Exception e) { - lastErr = e; - } - } - - assert (srv != null && lastErr == null) || (srv == null && lastErr != null); - - if (lastErr != null) - throw new IgniteCheckedException("Failed to bind to any [host:port] from the range [" + - "address=" + hostPort + ", lastErr=" + lastErr + ']'); - } - catch (Exception e) { - throw new IgniteCheckedException("Failed to start ODBC processor.", e); - } - } - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - if (srv != null) { - busyLock.block(); - - srv.stop(); - - ctx.ports().deregisterPorts(getClass()); - - if (odbcExecSvc != null) { - U.shutdownNow(getClass(), odbcExecSvc, log); - - odbcExecSvc = null; - } - - if (log.isDebugEnabled()) - log.debug("ODBC processor stopped."); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java new file mode 100644 index 0000000..9d731ab --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import java.util.Collection; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; + +/** + * ODBC message parser. + */ +public abstract class SqlListenerAbstractMessageParser implements SqlListenerMessageParser { + /** Initial output stream capacity. */ + protected static final int INIT_CAP = 1024; + + /** Kernal context. */ + protected GridKernalContext ctx; + + /** Logger. */ + private final IgniteLogger log; + + /** Object reader. */ + private SqlListenerAbstractObjectReader objReader; + + /** Object writer. */ + private SqlListenerAbstractObjectWriter objWriter; + + /** + * @param ctx Context. + * @param objReader Object reader. + * @param objWriter Object writer. + */ + protected SqlListenerAbstractMessageParser(final GridKernalContext ctx, SqlListenerAbstractObjectReader objReader, + SqlListenerAbstractObjectWriter objWriter) { + this.ctx = ctx; + + log = ctx.log(getClass()); + + this.objReader = objReader; + this.objWriter = objWriter; + } + + /** {@inheritDoc} */ + @Override public SqlListenerRequest decode(byte[] msg) { + assert msg != null; + + BinaryReaderExImpl reader = createReader(msg); + + byte cmd = reader.readByte(); + + SqlListenerRequest res; + + switch (cmd) { + case SqlListenerRequest.QRY_EXEC: { + String cache = reader.readString(); + String sql = reader.readString(); + int argsNum = reader.readInt(); + + Object[] params = new Object[argsNum]; + + for (int i = 0; i < argsNum; ++i) + params[i] = objReader.readObject(reader); + + res = new SqlListenerQueryExecuteRequest(cache, sql, params); + + break; + } + + case SqlListenerRequest.QRY_FETCH: { + long queryId = reader.readLong(); + int pageSize = reader.readInt(); + + res = new SqlListenerQueryFetchRequest(queryId, pageSize); + + break; + } + + case SqlListenerRequest.QRY_CLOSE: { + long queryId = reader.readLong(); + + res = new SqlListenerQueryCloseRequest(queryId); + + break; + } + + case SqlListenerRequest.META_COLS: { + String cache = reader.readString(); + String table = reader.readString(); + String column = reader.readString(); + + res = new OdbcQueryGetColumnsMetaRequest(cache, table, column); + + break; + } + + case SqlListenerRequest.META_TBLS: { + String catalog = reader.readString(); + String schema = reader.readString(); + String table = reader.readString(); + String tableType = reader.readString(); + + res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType); + + break; + } + + case SqlListenerRequest.META_PARAMS: { + String cacheName = reader.readString(); + String sqlQuery = reader.readString(); + + res = new OdbcQueryGetParamsMetaRequest(cacheName, sqlQuery); + + break; + } + + default: + throw new IgniteException("Unknown ODBC command: [cmd=" + cmd + ']'); + } + + return res; + } + + /** {@inheritDoc} */ + @Override public byte[] encode(SqlListenerResponse msg) { + assert msg != null; + + // Creating new binary writer + BinaryWriterExImpl writer = createWriter(INIT_CAP); + + // Writing status. + writer.writeByte((byte) msg.status()); + + if (msg.status() != SqlListenerResponse.STATUS_SUCCESS) { + writer.writeString(msg.error()); + + return writer.array(); + } + + Object res0 = msg.response(); + + if (res0 == null) + return writer.array(); + else if (res0 instanceof SqlListenerQueryExecuteResult) { + SqlListenerQueryExecuteResult res = (SqlListenerQueryExecuteResult) res0; + + if (log.isDebugEnabled()) + log.debug("Resulting query ID: " + res.getQueryId()); + + writer.writeLong(res.getQueryId()); + + Collection metas = res.getColumnsMetadata(); + + assert metas != null; + + writer.writeInt(metas.size()); + + for (SqlListenerColumnMeta meta : metas) + meta.write(writer); + } + else if (res0 instanceof SqlListenerQueryFetchResult) { + SqlListenerQueryFetchResult res = (SqlListenerQueryFetchResult) res0; + + if (log.isDebugEnabled()) + log.debug("Resulting query ID: " + res.queryId()); + + writer.writeLong(res.queryId()); + + Collection items0 = res.items(); + + assert items0 != null; + + writer.writeBoolean(res.last()); + + writer.writeInt(items0.size()); + + for (Object row0 : items0) { + if (row0 != null) { + Collection row = (Collection)row0; + + writer.writeInt(row.size()); + + for (Object obj : row) + objWriter.writeObject(writer, obj); + } + } + } + else if (res0 instanceof SqlListenerQueryCloseResult) { + SqlListenerQueryCloseResult res = (SqlListenerQueryCloseResult) res0; + + if (log.isDebugEnabled()) + log.debug("Resulting query ID: " + res.getQueryId()); + + writer.writeLong(res.getQueryId()); + } + else if (res0 instanceof OdbcQueryGetColumnsMetaResult) { + OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0; + + Collection columnsMeta = res.meta(); + + assert columnsMeta != null; + + writer.writeInt(columnsMeta.size()); + + for (SqlListenerColumnMeta columnMeta : columnsMeta) + columnMeta.write(writer); + } + else if (res0 instanceof OdbcQueryGetTablesMetaResult) { + OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0; + + Collection tablesMeta = res.meta(); + + assert tablesMeta != null; + + writer.writeInt(tablesMeta.size()); + + for (OdbcTableMeta tableMeta : tablesMeta) + tableMeta.writeBinary(writer); + } + else if (res0 instanceof OdbcQueryGetParamsMetaResult) { + OdbcQueryGetParamsMetaResult res = (OdbcQueryGetParamsMetaResult) res0; + + byte[] typeIds = res.typeIds(); + + objWriter.writeObject(writer, typeIds); + } + else + assert false : "Should not reach here."; + + return writer.array(); + } + + /** + * Create reader. + * + * @param msg Input message. + * @return Reader. + */ + protected abstract BinaryReaderExImpl createReader(byte[] msg); + + /** + * Create writer. + * + * @param cap Initial capacity. + * @return Binary writer instance. + */ + protected abstract BinaryWriterExImpl createWriter(int cap); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java new file mode 100644 index 0000000..18162e6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryUtils; +import org.apache.ignite.internal.binary.GridBinaryMarshaller; +import org.jetbrains.annotations.Nullable; + +/** + * Binary reader with marshaling non-primitive and non-embedded objects with JDK marshaller. + */ +@SuppressWarnings("unchecked") +public abstract class SqlListenerAbstractObjectReader { + /** + * @param reader Reader. + * @return Read object. + * @throws BinaryObjectException On error. + */ + @Nullable public Object readObject(BinaryReaderExImpl reader) throws BinaryObjectException { + byte type = reader.readByte(); + + switch (type) { + case GridBinaryMarshaller.NULL: + return null; + + case GridBinaryMarshaller.BOOLEAN: + return reader.readBoolean(); + + case GridBinaryMarshaller.BYTE: + return reader.readByte(); + + case GridBinaryMarshaller.CHAR: + return reader.readChar(); + + case GridBinaryMarshaller.SHORT: + return reader.readShort(); + + case GridBinaryMarshaller.INT: + return reader.readInt(); + + case GridBinaryMarshaller.LONG: + return reader.readLong(); + + case GridBinaryMarshaller.FLOAT: + return reader.readFloat(); + + case GridBinaryMarshaller.DOUBLE: + return reader.readDouble(); + + case GridBinaryMarshaller.STRING: + return BinaryUtils.doReadString(reader.in()); + + case GridBinaryMarshaller.DECIMAL: + return BinaryUtils.doReadDecimal(reader.in()); + + case GridBinaryMarshaller.UUID: + return BinaryUtils.doReadUuid(reader.in()); + + case GridBinaryMarshaller.TIME: + return BinaryUtils.doReadTime(reader.in()); + + case GridBinaryMarshaller.TIMESTAMP: + return BinaryUtils.doReadTimestamp(reader.in()); + + case GridBinaryMarshaller.DATE: + return BinaryUtils.doReadDate(reader.in()); + + case GridBinaryMarshaller.BOOLEAN_ARR: + return BinaryUtils.doReadBooleanArray(reader.in()); + + case GridBinaryMarshaller.BYTE_ARR: + return BinaryUtils.doReadByteArray(reader.in()); + + case GridBinaryMarshaller.CHAR_ARR: + return BinaryUtils.doReadCharArray(reader.in()); + + case GridBinaryMarshaller.SHORT_ARR: + return BinaryUtils.doReadShortArray(reader.in()); + + case GridBinaryMarshaller.INT_ARR: + return BinaryUtils.doReadIntArray(reader.in()); + + case GridBinaryMarshaller.FLOAT_ARR: + return BinaryUtils.doReadFloatArray(reader.in()); + + case GridBinaryMarshaller.DOUBLE_ARR: + return BinaryUtils.doReadDoubleArray(reader.in()); + + case GridBinaryMarshaller.STRING_ARR: + return BinaryUtils.doReadStringArray(reader.in()); + + case GridBinaryMarshaller.DECIMAL_ARR: + return BinaryUtils.doReadDecimalArray(reader.in()); + + case GridBinaryMarshaller.UUID_ARR: + return BinaryUtils.doReadUuidArray(reader.in()); + + case GridBinaryMarshaller.TIME_ARR: + return BinaryUtils.doReadTimeArray(reader.in()); + + case GridBinaryMarshaller.TIMESTAMP_ARR: + return BinaryUtils.doReadTimestampArray(reader.in()); + + case GridBinaryMarshaller.DATE_ARR: + return BinaryUtils.doReadDateArray(reader.in()); + + default: + reader.in().position(reader.in().position() - 1); + + return readCustomObject(reader); + } + } + + /** + * @param reader Reader. + * @return An object is unmarshaled by marshaller. + * @throws BinaryObjectException On error. + */ + protected abstract Object readCustomObject(BinaryReaderExImpl reader) throws BinaryObjectException; +}