ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [21/41] ignite git commit: IGNITE-5163: Implemented infrastructure for the new JDBC driver. This closes #1912.
Date Wed, 24 May 2017 17:27:05 GMT
IGNITE-5163: Implemented infrastructure for the new JDBC driver. This closes #1912.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6f1dc3ac
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6f1dc3ac
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6f1dc3ac

Branch: refs/heads/ignite-5267
Commit: 6f1dc3ac65d403a634331515cd1f279010d0d092
Parents: c04b39a
Author: tledkov-gridgain <tledkov@gridgain.com>
Authored: Tue May 23 15:55:48 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Tue May 23 15:55:48 2017 +0300

----------------------------------------------------------------------
 .../jdbc/suite/IgniteJdbcDriverTestSuite.java   |   3 +
 .../jdbc/thin/JdbcConnectionSelfTest.java       | 195 +++++++
 .../org/apache/ignite/IgniteJdbcThinDriver.java | 312 +++++++++++
 .../ignite/internal/GridKernalContext.java      |   8 +-
 .../ignite/internal/GridKernalContextImpl.java  |  12 +-
 .../apache/ignite/internal/IgniteKernal.java    |   4 +-
 .../internal/binary/BinaryWriterExImpl.java     |  16 +-
 .../internal/jdbc/thin/JdbcConnection.java      | 529 +++++++++++++++++++
 .../ignite/internal/jdbc/thin/JdbcTcpIo.java    | 207 ++++++++
 .../processors/odbc/OdbcNioListener.java        | 242 ---------
 .../internal/processors/odbc/OdbcProcessor.java | 199 -------
 .../odbc/SqlListenerAbstractMessageParser.java  | 265 ++++++++++
 .../odbc/SqlListenerAbstractObjectReader.java   | 137 +++++
 .../odbc/SqlListenerAbstractObjectWriter.java   | 111 ++++
 .../processors/odbc/SqlListenerNioListener.java | 263 +++++++++
 .../processors/odbc/SqlListenerProcessor.java   | 191 +++++++
 .../odbc/SqlListenerRequestHandlerImpl.java     | 494 +++++++++++++++++
 .../processors/odbc/jdbc/JdbcMessageParser.java |  50 ++
 .../processors/odbc/jdbc/JdbcObjectReader.java  |  33 ++
 .../processors/odbc/jdbc/JdbcObjectWriter.java  |  33 ++
 .../processors/odbc/odbc/OdbcMessageParser.java | 249 +--------
 .../processors/odbc/odbc/OdbcObjectReader.java  |  33 ++
 .../processors/odbc/odbc/OdbcObjectWriter.java  |  32 ++
 .../odbc/odbc/OdbcRequestHandler.java           | 513 ------------------
 .../odbc/OdbcProcessorValidationSelfTest.java   | 182 -------
 .../SqlListenerProcessorValidationSelfTest.java | 184 +++++++
 .../ignite/testframework/GridTestUtils.java     |   2 +-
 .../ignite/testsuites/IgniteBasicTestSuite.java |   4 +-
 .../cpp/odbc/include/ignite/odbc/message.h      |  10 +
 29 files changed, 3124 insertions(+), 1389 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
index 75671de..e2f09ba 100644
--- a/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/suite/IgniteJdbcDriverTestSuite.java
@@ -81,6 +81,9 @@ public class IgniteJdbcDriverTestSuite extends TestSuite {
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexTransactionalPartitionedSelfTest.class));
         suite.addTest(new TestSuite(org.apache.ignite.internal.jdbc2.JdbcDynamicIndexTransactionalReplicatedSelfTest.class));
 
+        // New thin JDBC
+        suite.addTest(new TestSuite(org.apache.ignite.jdbc.thin.JdbcConnectionSelfTest.class));
+
         return suite;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java
new file mode 100644
index 0000000..d7e2bef
--- /dev/null
+++ b/modules/clients/src/test/java/org/apache/ignite/jdbc/thin/JdbcConnectionSelfTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.jdbc.thin;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.concurrent.Callable;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.OdbcConfiguration;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Connection test.
+ */
+public class JdbcConnectionSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** URL prefix. */
+    private static final String URL_PREFIX = "jdbc:ignite:thin://";
+
+    /** Host. */
+    private static final String HOST = "127.0.0.1";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setCacheConfiguration(cacheConfiguration(DEFAULT_CACHE_NAME));
+
+        TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+        disco.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(disco);
+
+        cfg.setMarshaller(new BinaryMarshaller());
+
+        cfg.setOdbcConfiguration(new OdbcConfiguration());
+
+        return cfg;
+    }
+
+    /**
+     * @param name Cache name.
+     * @return Cache configuration.
+     * @throws Exception In case of error.
+     */
+    private CacheConfiguration cacheConfiguration(@NotNull String name) throws Exception {
+        CacheConfiguration cfg = defaultCacheConfiguration();
+
+        cfg.setName(name);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        try {
+            Driver drv = DriverManager.getDriver("jdbc:ignite://");
+
+            if (drv != null)
+                DriverManager.deregisterDriver(drv);
+        } catch (SQLException ignored) {
+            // No-op.
+        }
+
+        startGridsMultiThreaded(2);
+
+        Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDefaults() throws Exception {
+        String url = URL_PREFIX + HOST;
+
+        assert DriverManager.getConnection(url) != null;
+        assert DriverManager.getConnection(url + "/") != null;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testInvalidUrls() throws Exception {
+        GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                DriverManager.getConnection(URL_PREFIX + "127.0.0.1:80");
+
+                return null;
+            }
+        }, SQLException.class, "Failed to connect to Ignite cluster [host=127.0.0.1, port=80]");
+
+        GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                DriverManager.getConnection("q");
+
+                return null;
+            }
+        }, SQLException.class, "URL is invalid");
+
+        GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                DriverManager.getConnection(URL_PREFIX + "127.0.0.1:-1");
+
+                return null;
+            }
+        }, SQLException.class, "Invalid port:");
+
+        GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                DriverManager.getConnection(URL_PREFIX + "127.0.0.1:0");
+
+                return null;
+            }
+        }, SQLException.class, "Invalid port:");
+
+        GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                DriverManager.getConnection(URL_PREFIX + "127.0.0.1:100000");
+
+                return null;
+            }
+        }, SQLException.class, "Invalid port:");
+
+        GridTestUtils.assertThrowsAnyCause(log, new Callable<Void>() {
+            @Override public Void call() throws Exception {
+                DriverManager.getConnection(URL_PREFIX + "     :10000");
+
+                return null;
+            }
+        }, SQLException.class, "Host name is empty");
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testClose() throws Exception {
+        String url = URL_PREFIX + HOST;
+
+        final Connection conn = DriverManager.getConnection(url);
+
+        assert conn != null;
+        assert !conn.isClosed();
+
+        conn.close();
+
+        assert conn.isClosed();
+
+        assert !conn.isValid(2): "Connection must be closed";
+
+        GridTestUtils.assertThrows(
+            log,
+            new Callable<Object>() {
+                @Override public Object call() throws Exception {
+                    conn.isValid(-2);
+
+                    return null;
+                }
+            },
+            SQLException.class,
+            "Invalid timeout"
+        );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java
new file mode 100644
index 0000000..19e1edd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteJdbcThinDriver.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverManager;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.logging.Logger;
+import org.apache.ignite.cache.affinity.AffinityKey;
+import org.apache.ignite.configuration.OdbcConfiguration;
+import org.apache.ignite.internal.IgniteVersionUtils;
+import org.apache.ignite.internal.jdbc.JdbcDriverPropertyInfo;
+import org.apache.ignite.internal.jdbc.thin.JdbcConnection;
+
+/**
+ * JDBC driver thin implementation for In-Memory Data Grid.
+ * <p>
+ * Driver allows to get distributed data from Ignite cache using standard
+ * SQL queries and standard JDBC API. It will automatically get only fields that
+ * you actually need from objects stored in cache.
+ * <h1 class="header">Limitations</h1>
+ * Data in Ignite cache is usually distributed across several nodes,
+ * so some queries may not work as expected since the query will be sent to each
+ * individual node and results will then be collected and returned as JDBC result set.
+ * Keep in mind following limitations (not applied if data is queried from one node only,
+ * or data is fully co-located or fully replicated on multiple nodes):
+ * <ul>
+ *     <li>
+ *         Joins will work correctly only if joined objects are stored in
+ *         collocated mode. Refer to
+ *         {@link AffinityKey}
+ *         javadoc for more details.
+ *     </li>
+ *     <li>
+ *         Note that if you are connected to local or replicated cache, all data will
+ *         be queried only on one node, not depending on what caches participate in
+ *         the query (some data from partitioned cache can be lost). And visa versa,
+ *         if you are connected to partitioned cache, data from replicated caches
+ *         will be duplicated.
+ *     </li>
+ * </ul>
+ * <h1 class="header">SQL Notice</h1>
+ * Driver allows to query data from several caches. Cache that driver is connected to is
+ * treated as default schema in this case. Other caches can be referenced by their names.\
+ *
+ * <h1 class="header">Dependencies</h1>
+ * JDBC driver is located in main Ignite JAR in {@code IGNITE_HOME/libs} folder.
+ * <h1 class="header">Configuration</h1>
+ *
+ * <p>
+ * JDBC connection URL has the following pattern:
+ * {@code jdbc:ignite://<hostname>:<port>/}<br>
+ * Note the following:
+ * <ul>
+ *     <li>Hostname is required.</li>
+ *     <li>If port is not defined, {@code 10800} is used (default for Ignite thin client).</li>
+ * </ul>
+ * Other properties can be defined in {@link Properties} object passed to
+ * {@link DriverManager#getConnection(String, Properties)} method:
+ * <table class="doctable">
+ *     <tr>
+ *         <th>Name</th>
+ *         <th>Description</th>
+ *         <th>Default</th>
+ *         <th>Optional</th>
+ *     </tr>
+ *     <tr>
+ *         <td><b>ignite.jdbc.distributedJoins</b></td>
+ *         <td>Flag to enable distributed joins.</td>
+ *         <td>{@code false} (distributed joins are disabled)</td>
+ *         <td>Yes</td>
+ *     </tr>
+ *     <tr>
+ *         <td><b>ignite.jdbc.enforceJoinOrder</b></td>
+ *         <td>Flag to enforce join order of tables in the query.</td>
+ *         <td>{@code false} (enforcing join order is disabled)</td>
+ *         <td>Yes</td>
+ *     </tr>
+ * </table>
+ * <h1 class="header">Example</h1>
+ * <pre name="code" class="java">
+ * // Register JDBC driver.
+ * Class.forName("org.apache.ignite.IgniteJdbcThinDriver");
+ *
+ * // Open JDBC connection.
+ * Connection conn = DriverManager.getConnection("jdbc:ignite:thin//localhost:10800");
+ *
+ * // Query persons' names
+ * ResultSet rs = conn.createStatement().executeQuery("select name from Person");
+ *
+ * while (rs.next()) {
+ *     String name = rs.getString(1);
+ *
+ *     ...
+ * }
+ *
+ * // Query persons with specific age
+ * PreparedStatement stmt = conn.prepareStatement("select name, age from Person where age = ?");
+ *
+ * stmt.setInt(1, 30);
+ *
+ * ResultSet rs = stmt.executeQuery();
+ *
+ * while (rs.next()) {
+ *     String name = rs.getString("name");
+ *     int age = rs.getInt("age");
+ *
+ *     ...
+ * }
+ * </pre>
+ */
+@SuppressWarnings("JavadocReference")
+public class IgniteJdbcThinDriver implements Driver {
+    /** Prefix for property names. */
+    private static final String PROP_PREFIX = "ignite.jdbc";
+
+    /** Distributed joins parameter name. */
+    private static final String PARAM_DISTRIBUTED_JOINS = "distributedJoins";
+
+    /** Enforce join order parameter name. */
+    private static final String ENFORCE_JOIN_ORDER = "enforceJoinOrder";
+
+    /** Hostname property name. */
+    public static final String PROP_HOST = PROP_PREFIX + "host";
+
+    /** Port number property name. */
+    public static final String PROP_PORT = PROP_PREFIX + "port";
+
+    /** Distributed joins property name. */
+    public static final String PROP_DISTRIBUTED_JOINS = PROP_PREFIX + PARAM_DISTRIBUTED_JOINS;
+
+    /** Transactions allowed property name. */
+    public static final String PROP_ENFORCE_JOIN_ORDER = PROP_PREFIX + ENFORCE_JOIN_ORDER;
+
+    /** URL prefix. */
+    public static final String URL_PREFIX = "jdbc:ignite:thin://";
+
+    /** Default port. */
+    public static final int DFLT_PORT = OdbcConfiguration.DFLT_TCP_PORT_FROM;
+
+    /** Major version. */
+    private static final int MAJOR_VER = IgniteVersionUtils.VER.major();
+
+    /** Minor version. */
+    private static final int MINOR_VER = IgniteVersionUtils.VER.minor();
+
+    /*
+     * Register driver.
+     */
+    static {
+        try {
+            DriverManager.registerDriver(new IgniteJdbcThinDriver());
+        }
+        catch (SQLException e) {
+            throw new RuntimeException("Failed to register Ignite JDBC driver.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Connection connect(String url, Properties props) throws SQLException {
+        if (!parseUrl(url, props))
+            throw new SQLException("URL is invalid: " + url);
+
+        return new JdbcConnection(url, props);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean acceptsURL(String url) throws SQLException {
+        return url.startsWith(URL_PREFIX);
+    }
+
+    /** {@inheritDoc} */
+    @Override public DriverPropertyInfo[] getPropertyInfo(String url, Properties info) throws SQLException {
+        if (!parseUrl(url, info))
+            throw new SQLException("URL is invalid: " + url);
+
+        List<DriverPropertyInfo> props = Arrays.<DriverPropertyInfo>asList(
+            new JdbcDriverPropertyInfo("Hostname", info.getProperty(PROP_HOST), ""),
+            new JdbcDriverPropertyInfo("Port number", info.getProperty(PROP_PORT), ""),
+            new JdbcDriverPropertyInfo("Distributed Joins", info.getProperty(PROP_DISTRIBUTED_JOINS), ""),
+            new JdbcDriverPropertyInfo("Enforce Join Order", info.getProperty(PROP_ENFORCE_JOIN_ORDER), "")
+        );
+
+        return props.toArray(new DriverPropertyInfo[0]);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMajorVersion() {
+        return MAJOR_VER;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getMinorVersion() {
+        return MINOR_VER;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean jdbcCompliant() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Logger getParentLogger() throws SQLFeatureNotSupportedException {
+        throw new SQLFeatureNotSupportedException("java.util.logging is not used.");
+    }
+
+    /**
+     * Validates and parses connection URL.
+     *
+     * @param props Properties.
+     * @param url URL.
+     * @return Whether URL is valid.
+     */
+    private boolean parseUrl(String url, Properties props) {
+        if (url == null)
+            return false;
+
+        if (url.startsWith(URL_PREFIX) && url.length() > URL_PREFIX.length())
+            return parseJdbcUrl(url, props);
+
+        return false;
+    }
+
+    /**
+     * @param url Url.
+     * @param props Properties.
+     * @return Whether URL is valid.
+     */
+    private boolean parseJdbcUrl(String url, Properties props) {
+        url = url.substring(URL_PREFIX.length());
+
+        String[] parts = url.split("\\?");
+
+        if (parts.length > 2)
+            return false;
+
+        if (parts.length == 2)
+            if (!parseParameters(parts[1], "&", props))
+                return false;
+
+        parts = parts[0].split("/");
+
+        assert parts.length > 0;
+
+        if (parts.length > 1)
+            return false;
+
+        url = parts[0];
+
+        parts = url.split(":");
+
+        assert parts.length > 0;
+
+        if (parts.length > 2)
+            return false;
+
+        props.setProperty(PROP_HOST, parts[0]);
+
+        try {
+            props.setProperty(PROP_PORT, String.valueOf(parts.length == 2 ? Integer.valueOf(parts[1]) : DFLT_PORT));
+        }
+        catch (NumberFormatException ignored) {
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * Validates and parses URL parameters.
+     *
+     * @param val Parameters string.
+     * @param delim Delimiter.
+     * @param props Properties.
+     * @return Whether URL parameters string is valid.
+     */
+    private boolean parseParameters(String val, String delim, Properties props) {
+        String[] params = val.split(delim);
+
+        for (String param : params) {
+            String[] pair = param.split("=");
+
+            if (pair.length != 2 || pair[0].isEmpty() || pair[1].isEmpty())
+                return false;
+
+            props.setProperty(PROP_PREFIX + pair[0], pair[1]);
+        }
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 010bd21..7a01200 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -48,7 +48,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter;
 import org.apache.ignite.internal.processors.job.GridJobProcessor;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
 import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
-import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
+import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
 import org.apache.ignite.internal.processors.pool.PoolProcessor;
@@ -330,11 +330,11 @@ public interface GridKernalContext extends Iterable<GridComponent> {
     public GridQueryProcessor query();
 
     /**
-     * Gets ODBC processor.
+     * Gets SQL listener processor.
      *
-     * @return ODBC processor.
+     * @return SQL listener processor.
      */
-    public OdbcProcessor odbc();
+    public SqlListenerProcessor sqlListener();
 
     /**
      * @return Plugin processor.

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index bbc9846..262c5eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -64,7 +64,7 @@ import org.apache.ignite.internal.processors.job.GridJobProcessor;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
 import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
 import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
-import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
+import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
 import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
@@ -160,7 +160,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringInclude
-    private OdbcProcessor odbcProc;
+    private SqlListenerProcessor sqlListenerProc;
 
     /** */
     @GridToStringInclude
@@ -567,8 +567,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
             pluginProc = (IgnitePluginProcessor)comp;
         else if (comp instanceof GridQueryProcessor)
             qryProc = (GridQueryProcessor)comp;
-        else if (comp instanceof OdbcProcessor)
-            odbcProc = (OdbcProcessor)comp;
+        else if (comp instanceof SqlListenerProcessor)
+            sqlListenerProc = (SqlListenerProcessor)comp;
         else if (comp instanceof DataStructuresProcessor)
             dataStructuresProc = (DataStructuresProcessor)comp;
         else if (comp instanceof ClusterProcessor)
@@ -824,8 +824,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public OdbcProcessor odbc() {
-        return odbcProc;
+    @Override public SqlListenerProcessor sqlListener() {
+        return sqlListenerProc;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 40476a7..c36fd7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -125,7 +125,7 @@ import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
 import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor;
 import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor;
 import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor;
-import org.apache.ignite.internal.processors.odbc.OdbcProcessor;
+import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformNoopProcessor;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
 import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor;
@@ -924,7 +924,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             startProcessor(createComponent(IgniteCacheObjectProcessor.class, ctx));
             startProcessor(new GridCacheProcessor(ctx));startProcessor(new GridClusterStateProcessor(ctx));
             startProcessor(new GridQueryProcessor(ctx));
-            startProcessor(new OdbcProcessor(ctx));
+            startProcessor(new SqlListenerProcessor(ctx));
             startProcessor(new GridServiceProcessor(ctx));
             startProcessor(new GridTaskSessionProcessor(ctx));
             startProcessor(new GridJobProcessor(ctx));

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
index 7b5e9d3..7efe4b3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
@@ -938,7 +938,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
     /**
      * @param val Value.
      */
-    void writeByteFieldPrimitive(byte val) {
+    public void writeByteFieldPrimitive(byte val) {
         out.unsafeEnsure(1 + 1);
 
         out.unsafeWriteByte(GridBinaryMarshaller.BYTE);
@@ -965,7 +965,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
     /**
      * @param val Value.
      */
-    void writeShortFieldPrimitive(short val) {
+    public void writeShortFieldPrimitive(short val) {
         out.unsafeEnsure(1 + 2);
 
         out.unsafeWriteByte(GridBinaryMarshaller.SHORT);
@@ -985,7 +985,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
     /**
      * @param val Value.
      */
-    void writeIntFieldPrimitive(int val) {
+    public void writeIntFieldPrimitive(int val) {
         out.unsafeEnsure(1 + 4);
 
         out.unsafeWriteByte(GridBinaryMarshaller.INT);
@@ -1005,7 +1005,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
     /**
      * @param val Value.
      */
-    void writeLongFieldPrimitive(long val) {
+    public void writeLongFieldPrimitive(long val) {
         out.unsafeEnsure(1 + 8);
 
         out.unsafeWriteByte(GridBinaryMarshaller.LONG);
@@ -1025,7 +1025,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
     /**
      * @param val Value.
      */
-    void writeFloatFieldPrimitive(float val) {
+    public void writeFloatFieldPrimitive(float val) {
         out.unsafeEnsure(1 + 4);
 
         out.unsafeWriteByte(GridBinaryMarshaller.FLOAT);
@@ -1045,7 +1045,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
     /**
      * @param val Value.
      */
-    void writeDoubleFieldPrimitive(double val) {
+    public void writeDoubleFieldPrimitive(double val) {
         out.unsafeEnsure(1 + 8);
 
         out.unsafeWriteByte(GridBinaryMarshaller.DOUBLE);
@@ -1065,7 +1065,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
     /**
      * @param val Value.
      */
-    void writeCharFieldPrimitive(char val) {
+    public void writeCharFieldPrimitive(char val) {
         out.unsafeEnsure(1 + 2);
 
         out.unsafeWriteByte(GridBinaryMarshaller.CHAR);
@@ -1085,7 +1085,7 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
     /**
      * @param val Value.
      */
-    void writeBooleanFieldPrimitive(boolean val) {
+    public void writeBooleanFieldPrimitive(boolean val) {
         out.unsafeEnsure(1 + 1);
 
         out.unsafeWriteByte(GridBinaryMarshaller.BOOLEAN);

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java
new file mode 100644
index 0000000..25d62b4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcConnection.java
@@ -0,0 +1,529 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc.thin;
+
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executor;
+import java.util.logging.Logger;
+
+import static java.sql.ResultSet.CONCUR_READ_ONLY;
+import static java.sql.ResultSet.HOLD_CURSORS_OVER_COMMIT;
+import static java.sql.ResultSet.TYPE_FORWARD_ONLY;
+import static org.apache.ignite.IgniteJdbcThinDriver.PROP_DISTRIBUTED_JOINS;
+import static org.apache.ignite.IgniteJdbcThinDriver.PROP_ENFORCE_JOIN_ORDER;
+import static org.apache.ignite.IgniteJdbcThinDriver.PROP_HOST;
+import static org.apache.ignite.IgniteJdbcThinDriver.PROP_PORT;
+
+/**
+ * JDBC connection implementation.
+ *
+ * See documentation of {@link org.apache.ignite.IgniteJdbcThinDriver} for details.
+ */
+public class JdbcConnection implements Connection {
+    /** Logger. */
+    private static final Logger LOG = Logger.getLogger(JdbcConnection.class.getName());
+
+    /** Cache name. */
+    private String schemaName;
+
+    /** Closed flag. */
+    private boolean closed;
+
+    /** Current transaction isolation. */
+    private int txIsolation;
+
+    /** Auto commit flag. */
+    private boolean autoCommit;
+
+    /** Current transaction holdability. */
+    private int holdability;
+
+    /** Timeout. */
+    private int timeout;
+
+    /** Ignite endpoint. */
+    private JdbcTcpIo cliIo;
+
+    /**
+     * Creates new connection.
+     *
+     * @param url Connection URL.
+     * @param props Additional properties.
+     * @throws SQLException In case Ignite client failed to start.
+     */
+    public JdbcConnection(String url, Properties props) throws SQLException {
+        assert url != null;
+        assert props != null;
+
+        holdability = HOLD_CURSORS_OVER_COMMIT;
+        autoCommit = true;
+        txIsolation = Connection.TRANSACTION_NONE;
+
+        boolean distributedJoins = Boolean.parseBoolean(props.getProperty(PROP_DISTRIBUTED_JOINS, "false"));
+        boolean enforceJoinOrder = Boolean.parseBoolean(props.getProperty(PROP_ENFORCE_JOIN_ORDER, "false"));
+
+        String host = props.getProperty(PROP_HOST);
+        String portStr = props.getProperty(PROP_PORT);
+
+        try {
+            int port = Integer.parseInt(portStr);
+
+            if (port <= 0 || port > 0xFFFF)
+                throw new SQLException("Invalid port: " + portStr);
+        }
+        catch (NumberFormatException e) {
+            throw new SQLException("Invalid port: " + portStr, e);
+        }
+
+        if (host == null || host.trim().isEmpty())
+            throw new SQLException("Host name is empty.");
+
+        String endpoint = host.trim() + ":" + portStr.trim();
+
+        try {
+            cliIo = new JdbcTcpIo(endpoint, distributedJoins, enforceJoinOrder);
+
+            cliIo.start();
+        }
+        catch (Exception e) {
+            cliIo.close();
+
+            throw new SQLException("Failed to connect to Ignite cluster [host=" + host + ", port=" + portStr + ']', e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Statement createStatement() throws SQLException {
+        return createStatement(TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Statement createStatement(int resSetType, int resSetConcurrency) throws SQLException {
+        return createStatement(resSetType, resSetConcurrency, HOLD_CURSORS_OVER_COMMIT);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Statement createStatement(int resSetType, int resSetConcurrency,
+        int resSetHoldability) throws SQLException {
+        ensureNotClosed();
+
+        checkCursorOptions(resSetType, resSetConcurrency, resSetHoldability);
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement prepareStatement(String sql) throws SQLException {
+        return prepareStatement(sql, TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT);
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement prepareStatement(String sql, int resSetType,
+        int resSetConcurrency) throws SQLException {
+        return prepareStatement(sql, resSetType, resSetConcurrency, HOLD_CURSORS_OVER_COMMIT);
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement prepareStatement(String sql, int resSetType, int resSetConcurrency,
+        int resSetHoldability) throws SQLException {
+        ensureNotClosed();
+
+        checkCursorOptions(resSetType, resSetConcurrency, resSetHoldability);
+
+        return null;
+    }
+
+    /**
+     * @param resSetType Cursor option.
+     * @param resSetConcurrency Cursor option.
+     * @param resSetHoldability Cursor option.
+     * @throws SQLException If options unsupported.
+     */
+    private void checkCursorOptions(int resSetType, int resSetConcurrency,
+        int resSetHoldability) throws SQLException {
+        if (resSetType != TYPE_FORWARD_ONLY)
+            throw new SQLFeatureNotSupportedException("Invalid result set type (only forward is supported.)");
+
+        if (resSetConcurrency != CONCUR_READ_ONLY)
+            throw new SQLFeatureNotSupportedException("Invalid concurrency (updates are not supported).");
+
+        if (resSetHoldability != HOLD_CURSORS_OVER_COMMIT)
+            LOG.warning("Transactions are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public CallableStatement prepareCall(String sql) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Callable functions are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public CallableStatement prepareCall(String sql, int resSetType, int resSetConcurrency)
+        throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Callable functions are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String nativeSQL(String sql) throws SQLException {
+        ensureNotClosed();
+
+        return sql;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setAutoCommit(boolean autoCommit) throws SQLException {
+        ensureNotClosed();
+
+        this.autoCommit = autoCommit;
+
+        if (!autoCommit)
+            LOG.warning("Transactions are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean getAutoCommit() throws SQLException {
+        ensureNotClosed();
+
+        if (!autoCommit)
+            LOG.warning("Transactions are not supported.");
+
+        return autoCommit;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void commit() throws SQLException {
+        ensureNotClosed();
+
+        LOG.warning("Transactions are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rollback() throws SQLException {
+        ensureNotClosed();
+
+        LOG.warning("Transactions are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() throws SQLException {
+        if (closed)
+            return;
+
+        closed = true;
+
+        cliIo.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isClosed() throws SQLException {
+        return closed;
+    }
+
+    /** {@inheritDoc} */
+    @Override public DatabaseMetaData getMetaData() throws SQLException {
+        ensureNotClosed();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setReadOnly(boolean readOnly) throws SQLException {
+        ensureNotClosed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isReadOnly() throws SQLException {
+        ensureNotClosed();
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setCatalog(String catalog) throws SQLException {
+        ensureNotClosed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getCatalog() throws SQLException {
+        ensureNotClosed();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTransactionIsolation(int level) throws SQLException {
+        ensureNotClosed();
+
+        LOG.warning("Transactions are not supported.");
+
+        txIsolation = level;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getTransactionIsolation() throws SQLException {
+        ensureNotClosed();
+
+        LOG.warning("Transactions are not supported.");
+
+        return txIsolation;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SQLWarning getWarnings() throws SQLException {
+        ensureNotClosed();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void clearWarnings() throws SQLException {
+        ensureNotClosed();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Map<String, Class<?>> getTypeMap() throws SQLException {
+        throw new SQLFeatureNotSupportedException("Types mapping is not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Types mapping is not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setHoldability(int holdability) throws SQLException {
+        ensureNotClosed();
+
+        if (holdability != HOLD_CURSORS_OVER_COMMIT)
+            LOG.warning("Transactions are not supported.");
+
+        this.holdability = holdability;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getHoldability() throws SQLException {
+        ensureNotClosed();
+
+        return holdability;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Savepoint setSavepoint() throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Savepoints are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public Savepoint setSavepoint(String name) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Savepoints are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void rollback(Savepoint savepoint) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Savepoints are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Savepoints are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public CallableStatement prepareCall(String sql, int resSetType, int resSetConcurrency,
+        int resSetHoldability) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Callable functions are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement prepareStatement(String sql, int[] colIndexes) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public PreparedStatement prepareStatement(String sql, String[] colNames) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public Clob createClob() throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public Blob createBlob() throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public NClob createNClob() throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public SQLXML createSQLXML() throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isValid(int timeout) throws SQLException {
+        if (timeout < 0)
+            throw new SQLException("Invalid timeout: " + timeout);
+
+        return !closed;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setClientInfo(String name, String val) throws SQLClientInfoException {
+        throw new UnsupportedOperationException("Client info is not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setClientInfo(Properties props) throws SQLClientInfoException {
+        throw new UnsupportedOperationException("Client info is not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getClientInfo(String name) throws SQLException {
+        ensureNotClosed();
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Properties getClientInfo() throws SQLException {
+        ensureNotClosed();
+
+        return new Properties();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public Struct createStruct(String typeName, Object[] attrs) throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("SQL-specific types are not supported.");
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <T> T unwrap(Class<T> iface) throws SQLException {
+        if (!isWrapperFor(iface))
+            throw new SQLException("Connection is not a wrapper for " + iface.getName());
+
+        return (T)this;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isWrapperFor(Class<?> iface) throws SQLException {
+        return iface != null && iface == Connection.class;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setSchema(String schema) throws SQLException {
+        schemaName = schema;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String getSchema() throws SQLException {
+        return schemaName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void abort(Executor executor) throws SQLException {
+        close();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void setNetworkTimeout(Executor executor, int ms) throws SQLException {
+        if (ms < 0)
+            throw new IllegalArgumentException("Timeout is below zero: " + ms);
+
+        timeout = ms;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int getNetworkTimeout() throws SQLException {
+        return timeout;
+    }
+
+    /**
+     * Ensures that connection is not closed.
+     *
+     * @throws SQLException If connection is closed.
+     */
+    private void ensureNotClosed() throws SQLException {
+        if (closed)
+            throw new SQLException("Connection is closed.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java
new file mode 100644
index 0000000..4946b41
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcTcpIo.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc.thin;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.util.logging.Logger;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.processors.odbc.SqlListenerProtocolVersion;
+import org.apache.ignite.internal.processors.odbc.SqlListenerRequest;
+import org.apache.ignite.internal.processors.odbc.SqlListenerNioListener;
+import org.apache.ignite.internal.util.ipc.IpcEndpoint;
+import org.apache.ignite.internal.util.ipc.IpcEndpointFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * JDBC IO layer implementation based on blocking IPC streams.
+ */
+public class JdbcTcpIo {
+    /** Current version. */
+    private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2, 1, 0);
+
+    /** Initial output stream capacity. */
+    private static final int HANDSHAKE_MSG_SIZE = 10;
+
+    /** Logger. */
+    private static final Logger log = Logger.getLogger(JdbcTcpIo.class.getName());
+
+    /** Server endpoint address. */
+    private final String endpointAddr;
+
+    /** Endpoint. */
+    private IpcEndpoint endpoint;
+
+    /** Output stream. */
+    private BufferedOutputStream out;
+
+    /** Input stream. */
+    private BufferedInputStream in;
+
+    /** Distributed joins. */
+    private boolean distributedJoins;
+
+    /** Enforce join order. */
+    private boolean enforceJoinOrder;
+
+    /** Closed flag. */
+    private boolean closed;
+
+    /**
+     * @param endpointAddr Endpoint.
+     * @param distributedJoins Distributed joins flag.
+     * @param enforceJoinOrder Enforce join order flag.
+     */
+    JdbcTcpIo(String endpointAddr, boolean distributedJoins, boolean enforceJoinOrder) {
+        assert endpointAddr != null;
+
+        this.endpointAddr = endpointAddr;
+        this.distributedJoins = distributedJoins;
+        this.enforceJoinOrder= enforceJoinOrder;
+    }
+
+    /**
+     * @throws IgniteCheckedException On error.
+     * @throws IOException On IO error in handshake.
+     */
+    public void start() throws IgniteCheckedException, IOException {
+        endpoint = IpcEndpointFactory.connectEndpoint(endpointAddr, null);
+
+        out = new BufferedOutputStream(endpoint.outputStream());
+        in = new BufferedInputStream(endpoint.inputStream());
+
+        handshake();
+    }
+
+    /**
+     * @throws IOException On error.
+     * @throws IgniteCheckedException On error.
+     */
+    public void handshake() throws IOException, IgniteCheckedException {
+        BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(HANDSHAKE_MSG_SIZE),
+            null, null);
+
+        writer.writeByte((byte)SqlListenerRequest.HANDSHAKE);
+
+        writer.writeShort(CURRENT_VER.major());
+        writer.writeShort(CURRENT_VER.minor());
+        writer.writeShort(CURRENT_VER.maintenance());
+
+        writer.writeByte(SqlListenerNioListener.JDBC_CLIENT);
+
+        writer.writeBoolean(distributedJoins);
+        writer.writeBoolean(enforceJoinOrder);
+
+        send(writer.array());
+
+        BinaryReaderExImpl reader = new BinaryReaderExImpl(null, new BinaryHeapInputStream(read()),
+            null, null, false);
+
+        boolean accepted = reader.readBoolean();
+
+        if (accepted)
+            return;
+
+        short maj = reader.readShort();
+        short min = reader.readShort();
+        short maintenance = reader.readShort();
+
+        String err = reader.readString();
+
+        SqlListenerProtocolVersion ver = SqlListenerProtocolVersion.create(maj, min, maintenance);
+
+        throw new IgniteCheckedException("Handshake failed [driverProtocolVer=" + CURRENT_VER +
+            ", remoteNodeProtocolVer=" + ver + ", err=" + err + ']');
+    }
+
+    /**
+     * @param req ODBC request.
+     * @throws IOException On error.
+     */
+    private void send(byte[] req) throws IOException {
+        int size = req.length;
+
+        out.write(size & 0xFF);
+        out.write((size >> 8) & 0xFF);
+        out.write((size >> 16) & 0xFF);
+        out.write((size >> 24) & 0xFF);
+
+        out.write(req);
+
+        out.flush();
+    }
+
+    /**
+     * @return Bytes of a response from server.
+     * @throws IOException On error.
+     * @throws IgniteCheckedException On error.
+     */
+    private byte[] read() throws IOException, IgniteCheckedException {
+        byte[] sizeBytes = read(4);
+
+        int msgSize  = (((0xFF & sizeBytes[3]) << 24) | ((0xFF & sizeBytes[2]) << 16)
+            | ((0xFF & sizeBytes[1]) << 8) + (0xFF & sizeBytes[0]));
+
+        return read(msgSize);
+    }
+
+    /**
+     * @param size Count of bytes to read from stream.
+     * @return Read bytes.
+     * @throws IOException On error.
+     * @throws IgniteCheckedException On error.
+     */
+    private byte [] read(int size) throws IOException, IgniteCheckedException {
+        int off = 0;
+
+        byte[] data = new byte[size];
+
+        while (off != size) {
+            int res = in.read(data, off, size - off);
+
+            if (res == -1)
+                throw new IgniteCheckedException("Failed to read incoming message (not enough data).");
+
+            off += res;
+        }
+
+        return data;
+    }
+
+    /**
+     * Close the client IO.
+     */
+    public void close() {
+        if (closed)
+            return;
+
+        // Clean up resources.
+        U.closeQuiet(out);
+        U.closeQuiet(in);
+
+        if (endpoint != null)
+            endpoint.close();
+
+        closed = true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
deleted file mode 100644
index cdb3de3..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcNioListener.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc;
-
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.binary.BinaryReaderExImpl;
-import org.apache.ignite.internal.binary.BinaryWriterExImpl;
-import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
-import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
-import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.processors.odbc.odbc.OdbcMessageParser;
-import org.apache.ignite.internal.processors.odbc.odbc.OdbcRequestHandler;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * ODBC message listener.
- */
-public class OdbcNioListener extends GridNioServerListenerAdapter<byte[]> {
-    /** Current version. */
-    private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2, 1, 0);
-
-    /** Supported versions. */
-    private static final Set<SqlListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>();
-
-    /** Connection-related metadata key. */
-    private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
-
-    /** Request ID generator. */
-    private static final AtomicLong REQ_ID_GEN = new AtomicLong();
-
-    /** Busy lock. */
-    private final GridSpinBusyLock busyLock;
-
-    /** Kernal context. */
-    private final GridKernalContext ctx;
-
-    /** Maximum allowed cursors. */
-    private final int maxCursors;
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    static {
-        SUPPORTED_VERS.add(CURRENT_VER);
-    }
-
-    /**
-     * Constructor.
-     *
-     * @param ctx Context.
-     * @param busyLock Shutdown busy lock.
-     * @param maxCursors Maximum allowed cursors.
-     */
-    public OdbcNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors) {
-        this.ctx = ctx;
-        this.busyLock = busyLock;
-        this.maxCursors = maxCursors;
-
-        log = ctx.log(getClass());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onConnected(GridNioSession ses) {
-        if (log.isDebugEnabled())
-            log.debug("SQL client connected: " + ses.remoteAddress());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
-        if (log.isDebugEnabled()) {
-            if (e == null)
-                log.debug("SQL client disconnected: " + ses.remoteAddress());
-            else
-                log.debug("SQL client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']');
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onMessage(GridNioSession ses, byte[] msg) {
-        assert msg != null;
-
-        SqlListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);
-
-        if (connCtx == null) {
-            onHandshake(ses, msg);
-
-            return;
-        }
-
-        SqlListenerMessageParser parser = connCtx.parser();
-
-        SqlListenerRequest req;
-
-        try {
-            req = parser.decode(msg);
-        }
-        catch (Exception e) {
-            log.error("Failed to parse SQL client request [err=" + e + ']');
-
-            ses.close();
-
-            return;
-        }
-
-        assert req != null;
-
-        req.requestId(REQ_ID_GEN.incrementAndGet());
-
-        try {
-            long startTime = 0;
-
-            if (log.isDebugEnabled()) {
-                startTime = System.nanoTime();
-
-                log.debug("SQL client request received [reqId=" + req.requestId() + ", addr=" + ses.remoteAddress() +
-                    ", req=" + req + ']');
-            }
-
-            SqlListenerRequestHandler handler = connCtx.handler();
-
-            SqlListenerResponse resp = handler.handle(req);
-
-            if (log.isDebugEnabled()) {
-                long dur = (System.nanoTime() - startTime) / 1000;
-
-                log.debug("SQL client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur  +
-                    ", resp=" + resp.status() + ']');
-            }
-
-            byte[] outMsg = parser.encode(resp);
-
-            ses.send(outMsg);
-        }
-        catch (Exception e) {
-            log.error("Failed to process SQL client request [reqId=" + req.requestId() + ", err=" + e + ']');
-
-            ses.send(parser.encode(new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString())));
-        }
-    }
-
-    /**
-     * Perform handshake.
-     *
-     * @param ses Session.
-     * @param msg Message bytes.
-     */
-    private void onHandshake(GridNioSession ses, byte[] msg) {
-        BinaryInputStream stream = new BinaryHeapInputStream(msg);
-
-        BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null, true);
-
-        byte cmd = reader.readByte();
-
-        if (cmd != SqlListenerRequest.HANDSHAKE) {
-            log.error("Unexpected SQL client request (will close session): " + ses.remoteAddress());
-
-            ses.close();
-
-            return;
-        }
-
-        short verMajor = reader.readShort();
-        short verMinor = reader.readShort();
-        short verMaintenance = reader.readShort();
-
-        SqlListenerProtocolVersion ver = SqlListenerProtocolVersion.create(verMajor, verMinor, verMaintenance);
-
-        String errMsg = null;
-
-        if (SUPPORTED_VERS.contains(ver)) {
-            // Prepare context.
-            SqlListenerConnectionContext connCtx = prepareContext(ver, reader);
-
-            ses.addMeta(CONN_CTX_META_KEY, connCtx);
-        }
-        else {
-            log.warning("Unsupported version: " + ver.toString());
-
-            errMsg = "Unsupported version.";
-        }
-
-        // Send response.
-        BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(8), null, null);
-
-        if (errMsg == null) {
-            writer.writeBoolean(true);
-        }
-        else {
-            writer.writeBoolean(false);
-            writer.writeShort(CURRENT_VER.major());
-            writer.writeShort(CURRENT_VER.minor());
-            writer.writeShort(CURRENT_VER.maintenance());
-            writer.doWriteString(errMsg);
-        }
-
-        ses.send(writer.array());
-    }
-
-    /**
-     * Prepare context.
-     *
-     * @param ver Version.
-     * @param reader Reader.
-     * @return Context.
-     */
-    private SqlListenerConnectionContext prepareContext(SqlListenerProtocolVersion ver, BinaryReaderExImpl reader) {
-        // TODO: Switch between ODBC and JDBC.
-        boolean distributedJoins = reader.readBoolean();
-        boolean enforceJoinOrder = reader.readBoolean();
-
-        OdbcRequestHandler handler =
-            new OdbcRequestHandler(ctx, busyLock, maxCursors, distributedJoins, enforceJoinOrder);
-
-        OdbcMessageParser parser = new OdbcMessageParser(ctx);
-
-        return new SqlListenerConnectionContext(handler, parser);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
deleted file mode 100644
index 6b8b5a3..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/OdbcProcessor.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc;
-
-import java.net.InetAddress;
-import java.nio.ByteOrder;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.configuration.OdbcConfiguration;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.binary.BinaryMarshaller;
-import org.apache.ignite.internal.processors.GridProcessorAdapter;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.HostAndPortRange;
-import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
-import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
-import org.apache.ignite.internal.util.nio.GridNioFilter;
-import org.apache.ignite.internal.util.nio.GridNioServer;
-import org.apache.ignite.internal.util.nio.GridNioSession;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.marshaller.Marshaller;
-import org.apache.ignite.spi.IgnitePortProtocol;
-import org.apache.ignite.thread.IgniteThreadPoolExecutor;
-
-/**
- * ODBC processor.
- */
-public class OdbcProcessor extends GridProcessorAdapter {
-    /** Default number of selectors. */
-    private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
-
-    /** Default TCP_NODELAY flag. */
-    private static final boolean DFLT_TCP_NODELAY = true;
-
-    /** Default TCP direct buffer flag. */
-    private static final boolean DFLT_TCP_DIRECT_BUF = false;
-
-    /** Busy lock. */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
-
-    /** ODBC TCP Server. */
-    private GridNioServer<byte[]> srv;
-
-    /** ODBC executor service. */
-    private ExecutorService odbcExecSvc;
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public OdbcProcessor(GridKernalContext ctx) {
-        super(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
-        IgniteConfiguration cfg = ctx.config();
-
-        OdbcConfiguration odbcCfg = cfg.getOdbcConfiguration();
-
-        if (odbcCfg != null) {
-            try {
-                Marshaller marsh = cfg.getMarshaller();
-
-                if (marsh != null && !(marsh instanceof BinaryMarshaller))
-                    throw new IgniteCheckedException("ODBC can only be used with BinaryMarshaller (please set it " +
-                        "through IgniteConfiguration.setMarshaller())");
-
-                HostAndPortRange hostPort;
-
-                if (F.isEmpty(odbcCfg.getEndpointAddress())) {
-                    hostPort = new HostAndPortRange(OdbcConfiguration.DFLT_TCP_HOST,
-                        OdbcConfiguration.DFLT_TCP_PORT_FROM,
-                        OdbcConfiguration.DFLT_TCP_PORT_TO
-                    );
-                }
-                else {
-                    hostPort = HostAndPortRange.parse(odbcCfg.getEndpointAddress(),
-                        OdbcConfiguration.DFLT_TCP_PORT_FROM,
-                        OdbcConfiguration.DFLT_TCP_PORT_TO,
-                        "Failed to parse ODBC endpoint address"
-                    );
-                }
-
-                assertParameter(odbcCfg.getThreadPoolSize() > 0, "threadPoolSize > 0");
-
-                odbcExecSvc = new IgniteThreadPoolExecutor(
-                    "odbc",
-                    cfg.getIgniteInstanceName(),
-                    odbcCfg.getThreadPoolSize(),
-                    odbcCfg.getThreadPoolSize(),
-                    0,
-                    new LinkedBlockingQueue<Runnable>());
-
-                InetAddress host;
-
-                try {
-                    host = InetAddress.getByName(hostPort.host());
-                }
-                catch (Exception e) {
-                    throw new IgniteCheckedException("Failed to resolve ODBC host: " + hostPort.host(), e);
-                }
-
-                Exception lastErr = null;
-
-                for (int port = hostPort.portFrom(); port <= hostPort.portTo(); port++) {
-                    try {
-                        GridNioFilter[] filters = new GridNioFilter[] {
-                            new GridNioAsyncNotifyFilter(ctx.igniteInstanceName(), odbcExecSvc, log) {
-                                @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
-                                    proceedSessionOpened(ses);
-                                }
-                            },
-                            new GridNioCodecFilter(new SqlListenerBufferedParser(), log, false)
-                        };
-
-                        GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder()
-                            .address(host)
-                            .port(port)
-                            .listener(new OdbcNioListener(ctx, busyLock, odbcCfg.getMaxOpenCursors()))
-                            .logger(log)
-                            .selectorCount(DFLT_SELECTOR_CNT)
-                            .igniteInstanceName(ctx.igniteInstanceName())
-                            .serverName("odbc")
-                            .tcpNoDelay(DFLT_TCP_NODELAY)
-                            .directBuffer(DFLT_TCP_DIRECT_BUF)
-                            .byteOrder(ByteOrder.nativeOrder())
-                            .socketSendBufferSize(odbcCfg.getSocketSendBufferSize())
-                            .socketReceiveBufferSize(odbcCfg.getSocketReceiveBufferSize())
-                            .filters(filters)
-                            .directMode(false)
-                            .build();
-
-                        srv0.start();
-
-                        srv = srv0;
-
-                        ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());
-
-                        log.info("ODBC processor has started on TCP port " + port);
-
-                        lastErr = null;
-
-                        break;
-                    }
-                    catch (Exception e) {
-                        lastErr = e;
-                    }
-                }
-
-                assert (srv != null && lastErr == null) || (srv == null && lastErr != null);
-
-                if (lastErr != null)
-                    throw new IgniteCheckedException("Failed to bind to any [host:port] from the range [" +
-                        "address=" + hostPort + ", lastErr=" + lastErr + ']');
-            }
-            catch (Exception e) {
-                throw new IgniteCheckedException("Failed to start ODBC processor.", e);
-            }
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        if (srv != null) {
-            busyLock.block();
-
-            srv.stop();
-
-            ctx.ports().deregisterPorts(getClass());
-
-            if (odbcExecSvc != null) {
-                U.shutdownNow(getClass(), odbcExecSvc, log);
-
-                odbcExecSvc = null;
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("ODBC processor stopped.");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java
new file mode 100644
index 0000000..9d731ab
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractMessageParser.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+
+/**
+ * ODBC message parser.
+ */
+public abstract class SqlListenerAbstractMessageParser implements SqlListenerMessageParser {
+    /** Initial output stream capacity. */
+    protected static final int INIT_CAP = 1024;
+
+    /** Kernal context. */
+    protected GridKernalContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Object reader. */
+    private SqlListenerAbstractObjectReader objReader;
+
+    /** Object writer. */
+    private SqlListenerAbstractObjectWriter objWriter;
+
+    /**
+     * @param ctx Context.
+     * @param objReader Object reader.
+     * @param objWriter Object writer.
+     */
+    protected SqlListenerAbstractMessageParser(final GridKernalContext ctx, SqlListenerAbstractObjectReader objReader,
+        SqlListenerAbstractObjectWriter objWriter) {
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
+
+        this.objReader = objReader;
+        this.objWriter = objWriter;
+    }
+
+    /** {@inheritDoc} */
+    @Override public SqlListenerRequest decode(byte[] msg) {
+        assert msg != null;
+
+        BinaryReaderExImpl reader = createReader(msg);
+
+        byte cmd = reader.readByte();
+
+        SqlListenerRequest res;
+
+        switch (cmd) {
+            case SqlListenerRequest.QRY_EXEC: {
+                String cache = reader.readString();
+                String sql = reader.readString();
+                int argsNum = reader.readInt();
+
+                Object[] params = new Object[argsNum];
+
+                for (int i = 0; i < argsNum; ++i)
+                    params[i] = objReader.readObject(reader);
+
+                res = new SqlListenerQueryExecuteRequest(cache, sql, params);
+
+                break;
+            }
+
+            case SqlListenerRequest.QRY_FETCH: {
+                long queryId = reader.readLong();
+                int pageSize = reader.readInt();
+
+                res = new SqlListenerQueryFetchRequest(queryId, pageSize);
+
+                break;
+            }
+
+            case SqlListenerRequest.QRY_CLOSE: {
+                long queryId = reader.readLong();
+
+                res = new SqlListenerQueryCloseRequest(queryId);
+
+                break;
+            }
+
+            case SqlListenerRequest.META_COLS: {
+                String cache = reader.readString();
+                String table = reader.readString();
+                String column = reader.readString();
+
+                res = new OdbcQueryGetColumnsMetaRequest(cache, table, column);
+
+                break;
+            }
+
+            case SqlListenerRequest.META_TBLS: {
+                String catalog = reader.readString();
+                String schema = reader.readString();
+                String table = reader.readString();
+                String tableType = reader.readString();
+
+                res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
+
+                break;
+            }
+
+            case SqlListenerRequest.META_PARAMS: {
+                String cacheName = reader.readString();
+                String sqlQuery = reader.readString();
+
+                res = new OdbcQueryGetParamsMetaRequest(cacheName, sqlQuery);
+
+                break;
+            }
+
+            default:
+                throw new IgniteException("Unknown ODBC command: [cmd=" + cmd + ']');
+        }
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] encode(SqlListenerResponse msg) {
+        assert msg != null;
+
+        // Creating new binary writer
+        BinaryWriterExImpl writer = createWriter(INIT_CAP);
+
+        // Writing status.
+        writer.writeByte((byte) msg.status());
+
+        if (msg.status() != SqlListenerResponse.STATUS_SUCCESS) {
+            writer.writeString(msg.error());
+
+            return writer.array();
+        }
+
+        Object res0 = msg.response();
+
+        if (res0 == null)
+            return writer.array();
+        else if (res0 instanceof SqlListenerQueryExecuteResult) {
+            SqlListenerQueryExecuteResult res = (SqlListenerQueryExecuteResult) res0;
+
+            if (log.isDebugEnabled())
+                log.debug("Resulting query ID: " + res.getQueryId());
+
+            writer.writeLong(res.getQueryId());
+
+            Collection<SqlListenerColumnMeta> metas = res.getColumnsMetadata();
+
+            assert metas != null;
+
+            writer.writeInt(metas.size());
+
+            for (SqlListenerColumnMeta meta : metas)
+                meta.write(writer);
+        }
+        else if (res0 instanceof SqlListenerQueryFetchResult) {
+            SqlListenerQueryFetchResult res = (SqlListenerQueryFetchResult) res0;
+
+            if (log.isDebugEnabled())
+                log.debug("Resulting query ID: " + res.queryId());
+
+            writer.writeLong(res.queryId());
+
+            Collection<?> items0 = res.items();
+
+            assert items0 != null;
+
+            writer.writeBoolean(res.last());
+
+            writer.writeInt(items0.size());
+
+            for (Object row0 : items0) {
+                if (row0 != null) {
+                    Collection<?> row = (Collection<?>)row0;
+
+                    writer.writeInt(row.size());
+
+                    for (Object obj : row)
+                        objWriter.writeObject(writer, obj);
+                }
+            }
+        }
+        else if (res0 instanceof SqlListenerQueryCloseResult) {
+            SqlListenerQueryCloseResult res = (SqlListenerQueryCloseResult) res0;
+
+            if (log.isDebugEnabled())
+                log.debug("Resulting query ID: " + res.getQueryId());
+
+            writer.writeLong(res.getQueryId());
+        }
+        else if (res0 instanceof OdbcQueryGetColumnsMetaResult) {
+            OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0;
+
+            Collection<SqlListenerColumnMeta> columnsMeta = res.meta();
+
+            assert columnsMeta != null;
+
+            writer.writeInt(columnsMeta.size());
+
+            for (SqlListenerColumnMeta columnMeta : columnsMeta)
+                columnMeta.write(writer);
+        }
+        else if (res0 instanceof OdbcQueryGetTablesMetaResult) {
+            OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0;
+
+            Collection<OdbcTableMeta> tablesMeta = res.meta();
+
+            assert tablesMeta != null;
+
+            writer.writeInt(tablesMeta.size());
+
+            for (OdbcTableMeta tableMeta : tablesMeta)
+                tableMeta.writeBinary(writer);
+        }
+        else if (res0 instanceof OdbcQueryGetParamsMetaResult) {
+            OdbcQueryGetParamsMetaResult res = (OdbcQueryGetParamsMetaResult) res0;
+
+            byte[] typeIds = res.typeIds();
+
+            objWriter.writeObject(writer, typeIds);
+        }
+        else
+            assert false : "Should not reach here.";
+
+        return writer.array();
+    }
+
+    /**
+     * Create reader.
+     *
+     * @param msg Input message.
+     * @return Reader.
+     */
+    protected abstract BinaryReaderExImpl createReader(byte[] msg);
+
+    /**
+     * Create writer.
+     *
+     * @param cap Initial capacity.
+     * @return Binary writer instance.
+     */
+    protected abstract BinaryWriterExImpl createWriter(int cap);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java
new file mode 100644
index 0000000..18162e6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectReader.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Binary reader with marshaling non-primitive and non-embedded objects with JDK marshaller.
+ */
+@SuppressWarnings("unchecked")
+public abstract class SqlListenerAbstractObjectReader {
+    /**
+     * @param reader Reader.
+     * @return Read object.
+     * @throws BinaryObjectException On error.
+     */
+    @Nullable public Object readObject(BinaryReaderExImpl reader) throws BinaryObjectException {
+        byte type = reader.readByte();
+
+        switch (type) {
+            case GridBinaryMarshaller.NULL:
+                return null;
+
+            case GridBinaryMarshaller.BOOLEAN:
+                return reader.readBoolean();
+
+            case GridBinaryMarshaller.BYTE:
+                return reader.readByte();
+
+            case GridBinaryMarshaller.CHAR:
+                return reader.readChar();
+
+            case GridBinaryMarshaller.SHORT:
+                return reader.readShort();
+
+            case GridBinaryMarshaller.INT:
+                return reader.readInt();
+
+            case GridBinaryMarshaller.LONG:
+                return reader.readLong();
+
+            case GridBinaryMarshaller.FLOAT:
+                return reader.readFloat();
+
+            case GridBinaryMarshaller.DOUBLE:
+                return reader.readDouble();
+
+            case GridBinaryMarshaller.STRING:
+                return BinaryUtils.doReadString(reader.in());
+
+            case GridBinaryMarshaller.DECIMAL:
+                return BinaryUtils.doReadDecimal(reader.in());
+
+            case GridBinaryMarshaller.UUID:
+                return BinaryUtils.doReadUuid(reader.in());
+
+            case GridBinaryMarshaller.TIME:
+                return BinaryUtils.doReadTime(reader.in());
+
+            case GridBinaryMarshaller.TIMESTAMP:
+                return BinaryUtils.doReadTimestamp(reader.in());
+
+            case GridBinaryMarshaller.DATE:
+                return BinaryUtils.doReadDate(reader.in());
+
+            case GridBinaryMarshaller.BOOLEAN_ARR:
+                return BinaryUtils.doReadBooleanArray(reader.in());
+
+            case GridBinaryMarshaller.BYTE_ARR:
+                return BinaryUtils.doReadByteArray(reader.in());
+
+            case GridBinaryMarshaller.CHAR_ARR:
+                return BinaryUtils.doReadCharArray(reader.in());
+
+            case GridBinaryMarshaller.SHORT_ARR:
+                return BinaryUtils.doReadShortArray(reader.in());
+
+            case GridBinaryMarshaller.INT_ARR:
+                return BinaryUtils.doReadIntArray(reader.in());
+
+            case GridBinaryMarshaller.FLOAT_ARR:
+                return BinaryUtils.doReadFloatArray(reader.in());
+
+            case GridBinaryMarshaller.DOUBLE_ARR:
+                return BinaryUtils.doReadDoubleArray(reader.in());
+
+            case GridBinaryMarshaller.STRING_ARR:
+                return BinaryUtils.doReadStringArray(reader.in());
+
+            case GridBinaryMarshaller.DECIMAL_ARR:
+                return BinaryUtils.doReadDecimalArray(reader.in());
+
+            case GridBinaryMarshaller.UUID_ARR:
+                return BinaryUtils.doReadUuidArray(reader.in());
+
+            case GridBinaryMarshaller.TIME_ARR:
+                return BinaryUtils.doReadTimeArray(reader.in());
+
+            case GridBinaryMarshaller.TIMESTAMP_ARR:
+                return BinaryUtils.doReadTimestampArray(reader.in());
+
+            case GridBinaryMarshaller.DATE_ARR:
+                return BinaryUtils.doReadDateArray(reader.in());
+
+            default:
+                reader.in().position(reader.in().position() - 1);
+
+                return readCustomObject(reader);
+        }
+    }
+
+    /**
+     * @param reader Reader.
+     * @return An object is unmarshaled by marshaller.
+     * @throws BinaryObjectException On error.
+     */
+    protected abstract Object readCustomObject(BinaryReaderExImpl reader) throws BinaryObjectException;
+}


Mime
View raw message