ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From agoncha...@apache.org
Subject [20/41] ignite git commit: IGNITE-5163: Implemented infrastructure for the new JDBC driver. This closes #1912.
Date Wed, 24 May 2017 17:27:04 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectWriter.java
new file mode 100644
index 0000000..f5e9924
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerAbstractObjectWriter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.math.BigDecimal;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.UUID;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Binary writer with marshaling non-primitive and non-embedded objects with JDK marshaller..
+ */
+public abstract class SqlListenerAbstractObjectWriter {
+    /**
+     * @param writer Writer.
+     * @param obj Object to write.
+     * @throws BinaryObjectException On error.
+     */
+    public void writeObject(BinaryWriterExImpl writer, @Nullable Object obj) throws BinaryObjectException {
+        if (obj == null) {
+            writer.writeByte(GridBinaryMarshaller.NULL);
+
+            return;
+        }
+
+        Class<?> cls = obj.getClass();
+
+        if (cls == Boolean.class)
+            writer.writeBooleanFieldPrimitive((Boolean)obj);
+        else if (cls == Byte.class)
+            writer.writeByteFieldPrimitive((Byte)obj);
+        else if (cls == Character.class)
+            writer.writeCharFieldPrimitive((Character)obj);
+        else if (cls == Short.class)
+            writer.writeShortFieldPrimitive((Short)obj);
+        else if (cls == Integer.class)
+            writer.writeIntFieldPrimitive((Integer)obj);
+        else if (cls == Long.class)
+            writer.writeLongFieldPrimitive((Long)obj);
+        else if (cls == Float.class)
+            writer.writeFloatFieldPrimitive((Float)obj);
+        else if (cls == Double.class)
+            writer.writeDoubleFieldPrimitive((Double)obj);
+        else if (cls == String.class)
+            writer.doWriteString((String)obj);
+        else if (cls == BigDecimal.class)
+            writer.doWriteDecimal((BigDecimal)obj);
+        else if (cls == UUID.class)
+            writer.writeUuid((UUID)obj);
+        else if (cls == Time.class)
+            writer.writeTime((Time)obj);
+        else if (cls == Timestamp.class)
+            writer.writeTimestamp((Timestamp)obj);
+        else if (cls == java.sql.Date.class || cls == java.util.Date.class)
+            writer.writeDate((java.util.Date)obj);
+        else if (cls == boolean[].class)
+            writer.writeBooleanArray((boolean[])obj);
+        else if (cls == byte[].class)
+            writer.writeByteArray((byte[])obj);
+        else if (cls == char[].class)
+            writer.writeCharArray((char[])obj);
+        else if (cls == short[].class)
+            writer.writeShortArray((short[])obj);
+        else if (cls == int[].class)
+            writer.writeIntArray((int[])obj);
+        else if (cls == float[].class)
+            writer.writeFloatArray((float[])obj);
+        else if (cls == double[].class)
+            writer.writeDoubleArray((double[])obj);
+        else if (cls == String[].class)
+            writer.writeStringArray((String[])obj);
+        else if (cls == BigDecimal[].class)
+            writer.writeDecimalArray((BigDecimal[])obj);
+        else if (cls == UUID[].class)
+            writer.writeUuidArray((UUID[])obj);
+        else if (cls == Time[].class)
+            writer.writeTimeArray((Time[])obj);
+        else if (cls == Timestamp[].class)
+            writer.writeTimestampArray((Timestamp[])obj);
+        else if (cls == java.util.Date[].class || cls == java.sql.Date[].class)
+            writer.writeDateArray((java.util.Date[])obj);
+        else
+            writeCustomObject(writer, obj);
+    }
+
+    /**
+     * @param writer Writer.
+     * @param obj Object to marshal with marshaller and write to binary stream.
+     * @throws BinaryObjectException On error.
+     */
+    protected abstract void writeCustomObject(BinaryWriterExImpl writer, Object obj) throws BinaryObjectException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java
new file mode 100644
index 0000000..9eaec04
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerNioListener.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.odbc.jdbc.JdbcMessageParser;
+import org.apache.ignite.internal.processors.odbc.odbc.OdbcMessageParser;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * ODBC message listener.
+ */
+public class SqlListenerNioListener extends GridNioServerListenerAdapter<byte[]> {
+    /** The value corresponds to ODBC driver of the parser field of the handshake request. */
+    public static final byte ODBC_CLIENT = 0;
+
+    /** The value corresponds to JDBC driver of the parser field of the handshake request. */
+    public static final byte JDBC_CLIENT = 1;
+
+    /** Current version. */
+    private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2, 1, 0);
+
+    /** Supported versions. */
+    private static final Set<SqlListenerProtocolVersion> SUPPORTED_VERS = new HashSet<>();
+
+    /** Connection-related metadata key. */
+    private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Request ID generator. */
+    private static final AtomicLong REQ_ID_GEN = new AtomicLong();
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock;
+
+    /** Kernal context. */
+    private final GridKernalContext ctx;
+
+    /** Maximum allowed cursors. */
+    private final int maxCursors;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    static {
+        SUPPORTED_VERS.add(CURRENT_VER);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     * @param busyLock Shutdown busy lock.
+     * @param maxCursors Maximum allowed cursors.
+     */
+    public SqlListenerNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors) {
+        this.ctx = ctx;
+        this.busyLock = busyLock;
+        this.maxCursors = maxCursors;
+
+        log = ctx.log(getClass());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onConnected(GridNioSession ses) {
+        if (log.isDebugEnabled())
+            log.debug("SQL client connected: " + ses.remoteAddress());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+        if (log.isDebugEnabled()) {
+            if (e == null)
+                log.debug("SQL client disconnected: " + ses.remoteAddress());
+            else
+                log.debug("SQL client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']');
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMessage(GridNioSession ses, byte[] msg) {
+        assert msg != null;
+
+        SqlListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY);
+
+        if (connCtx == null) {
+            onHandshake(ses, msg);
+
+            return;
+        }
+
+        SqlListenerMessageParser parser = connCtx.parser();
+
+        SqlListenerRequest req;
+
+        try {
+            req = parser.decode(msg);
+        }
+        catch (Exception e) {
+            log.error("Failed to parse SQL client request [err=" + e + ']');
+
+            ses.close();
+
+            return;
+        }
+
+        assert req != null;
+
+        req.requestId(REQ_ID_GEN.incrementAndGet());
+
+        try {
+            long startTime = 0;
+
+            if (log.isDebugEnabled()) {
+                startTime = System.nanoTime();
+
+                log.debug("SQL client request received [reqId=" + req.requestId() + ", addr=" + ses.remoteAddress() +
+                    ", req=" + req + ']');
+            }
+
+            SqlListenerRequestHandler handler = connCtx.handler();
+
+            SqlListenerResponse resp = handler.handle(req);
+
+            if (log.isDebugEnabled()) {
+                long dur = (System.nanoTime() - startTime) / 1000;
+
+                log.debug("SQL client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur  +
+                    ", resp=" + resp.status() + ']');
+            }
+
+            byte[] outMsg = parser.encode(resp);
+
+            ses.send(outMsg);
+        }
+        catch (Exception e) {
+            log.error("Failed to process SQL client request [reqId=" + req.requestId() + ", err=" + e + ']');
+
+            ses.send(parser.encode(new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString())));
+        }
+    }
+
+    /**
+     * Perform handshake.
+     *
+     * @param ses Session.
+     * @param msg Message bytes.
+     */
+    private void onHandshake(GridNioSession ses, byte[] msg) {
+        BinaryInputStream stream = new BinaryHeapInputStream(msg);
+
+        BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null, true);
+
+        byte cmd = reader.readByte();
+
+        if (cmd != SqlListenerRequest.HANDSHAKE) {
+            log.error("Unexpected SQL client request (will close session): " + ses.remoteAddress());
+
+            ses.close();
+
+            return;
+        }
+
+        short verMajor = reader.readShort();
+        short verMinor = reader.readShort();
+        short verMaintenance = reader.readShort();
+
+        SqlListenerProtocolVersion ver = SqlListenerProtocolVersion.create(verMajor, verMinor, verMaintenance);
+
+        String errMsg = null;
+
+        if (SUPPORTED_VERS.contains(ver)) {
+            // Prepare context.
+            SqlListenerConnectionContext connCtx = prepareContext(ver, reader);
+
+            ses.addMeta(CONN_CTX_META_KEY, connCtx);
+        }
+        else {
+            log.warning("Unsupported version: " + ver.toString());
+
+            errMsg = "Unsupported version.";
+        }
+
+        // Send response.
+        BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(8), null, null);
+
+        if (errMsg == null)
+            writer.writeBoolean(true);
+        else {
+            writer.writeBoolean(false);
+            writer.writeShort(CURRENT_VER.major());
+            writer.writeShort(CURRENT_VER.minor());
+            writer.writeShort(CURRENT_VER.maintenance());
+            writer.doWriteString(errMsg);
+        }
+
+        ses.send(writer.array());
+    }
+
+    /**
+     * Prepare context.
+     *
+     * @param ver Version.
+     * @param reader Reader.
+     * @return Context.
+     */
+    private SqlListenerConnectionContext prepareContext(SqlListenerProtocolVersion ver, BinaryReaderExImpl reader) {
+        byte clientType = reader.readByte();
+
+        boolean distributedJoins = reader.readBoolean();
+        boolean enforceJoinOrder = reader.readBoolean();
+
+        SqlListenerRequestHandlerImpl handler = new SqlListenerRequestHandlerImpl(ctx, busyLock, maxCursors,
+            distributedJoins, enforceJoinOrder);
+
+        SqlListenerMessageParser parser = null;
+
+        switch (clientType) {
+            case ODBC_CLIENT:
+                parser = new OdbcMessageParser(ctx);
+
+                break;
+
+            case JDBC_CLIENT:
+                parser = new JdbcMessageParser(ctx);
+
+                break;
+        }
+
+        if (parser == null)
+            throw new IgniteException("Unknown client type: " + clientType);
+
+        return new SqlListenerConnectionContext(handler, parser);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerProcessor.java
new file mode 100644
index 0000000..cbe54df
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerProcessor.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.net.InetAddress;
+import java.nio.ByteOrder;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.OdbcConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.HostAndPortRange;
+import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter;
+import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
+import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioServer;
+import org.apache.ignite.internal.util.nio.GridNioSession;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.spi.IgnitePortProtocol;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+
+/**
+ * ODBC processor.
+ */
+public class SqlListenerProcessor extends GridProcessorAdapter {
+    /** Default number of selectors. */
+    private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors());
+
+    /** Default TCP_NODELAY flag. */
+    private static final boolean DFLT_TCP_NODELAY = true;
+
+    /** Default TCP direct buffer flag. */
+    private static final boolean DFLT_TCP_DIRECT_BUF = false;
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+
+    /** ODBC TCP Server. */
+    private GridNioServer<byte[]> srv;
+
+    /** ODBC executor service. */
+    private ExecutorService odbcExecSvc;
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public SqlListenerProcessor(GridKernalContext ctx) {
+        super(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(boolean activeOnStart) throws IgniteCheckedException {
+        IgniteConfiguration cfg = ctx.config();
+
+        OdbcConfiguration odbcCfg = cfg.getOdbcConfiguration();
+
+        if (odbcCfg != null) {
+            try {
+                HostAndPortRange hostPort;
+
+                if (F.isEmpty(odbcCfg.getEndpointAddress())) {
+                    hostPort = new HostAndPortRange(OdbcConfiguration.DFLT_TCP_HOST,
+                        OdbcConfiguration.DFLT_TCP_PORT_FROM,
+                        OdbcConfiguration.DFLT_TCP_PORT_TO
+                    );
+                }
+                else {
+                    hostPort = HostAndPortRange.parse(odbcCfg.getEndpointAddress(),
+                        OdbcConfiguration.DFLT_TCP_PORT_FROM,
+                        OdbcConfiguration.DFLT_TCP_PORT_TO,
+                        "Failed to parse ODBC endpoint address"
+                    );
+                }
+
+                assertParameter(odbcCfg.getThreadPoolSize() > 0, "threadPoolSize > 0");
+
+                odbcExecSvc = new IgniteThreadPoolExecutor(
+                    "odbc",
+                    cfg.getIgniteInstanceName(),
+                    odbcCfg.getThreadPoolSize(),
+                    odbcCfg.getThreadPoolSize(),
+                    0,
+                    new LinkedBlockingQueue<Runnable>());
+
+                InetAddress host;
+
+                try {
+                    host = InetAddress.getByName(hostPort.host());
+                }
+                catch (Exception e) {
+                    throw new IgniteCheckedException("Failed to resolve ODBC host: " + hostPort.host(), e);
+                }
+
+                Exception lastErr = null;
+
+                for (int port = hostPort.portFrom(); port <= hostPort.portTo(); port++) {
+                    try {
+                        GridNioFilter[] filters = new GridNioFilter[] {
+                            new GridNioAsyncNotifyFilter(ctx.igniteInstanceName(), odbcExecSvc, log) {
+                                @Override public void onSessionOpened(GridNioSession ses) throws IgniteCheckedException {
+                                    proceedSessionOpened(ses);
+                                }
+                            },
+                            new GridNioCodecFilter(new SqlListenerBufferedParser(), log, false)
+                        };
+
+                        GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder()
+                            .address(host)
+                            .port(port)
+                            .listener(new SqlListenerNioListener(ctx, busyLock, odbcCfg.getMaxOpenCursors()))
+                            .logger(log)
+                            .selectorCount(DFLT_SELECTOR_CNT)
+                            .igniteInstanceName(ctx.igniteInstanceName())
+                            .serverName("odbc")
+                            .tcpNoDelay(DFLT_TCP_NODELAY)
+                            .directBuffer(DFLT_TCP_DIRECT_BUF)
+                            .byteOrder(ByteOrder.nativeOrder())
+                            .socketSendBufferSize(odbcCfg.getSocketSendBufferSize())
+                            .socketReceiveBufferSize(odbcCfg.getSocketReceiveBufferSize())
+                            .filters(filters)
+                            .directMode(false)
+                            .build();
+
+                        srv0.start();
+
+                        srv = srv0;
+
+                        ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass());
+
+                        log.info("ODBC processor has started on TCP port " + port);
+
+                        lastErr = null;
+
+                        break;
+                    }
+                    catch (Exception e) {
+                        lastErr = e;
+                    }
+                }
+
+                assert (srv != null && lastErr == null) || (srv == null && lastErr != null);
+
+                if (lastErr != null)
+                    throw new IgniteCheckedException("Failed to bind to any [host:port] from the range [" +
+                        "address=" + hostPort + ", lastErr=" + lastErr + ']');
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException("Failed to start ODBC processor.", e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        if (srv != null) {
+            busyLock.block();
+
+            srv.stop();
+
+            ctx.ports().deregisterPorts(getClass());
+
+            if (odbcExecSvc != null) {
+                U.shutdownNow(getClass(), odbcExecSvc, log);
+
+                odbcExecSvc = null;
+            }
+
+            if (log.isDebugEnabled())
+                log.debug("ODBC processor stopped.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerRequestHandlerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerRequestHandlerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerRequestHandlerImpl.java
new file mode 100644
index 0000000..1230bb4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerRequestHandlerImpl.java
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc;
+
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_COLS;
+import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_PARAMS;
+import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_TBLS;
+import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_CLOSE;
+import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_EXEC;
+import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_FETCH;
+
+/**
+ * SQL query handler.
+ */
+public class SqlListenerRequestHandlerImpl implements SqlListenerRequestHandler {
+    /** Query ID sequence. */
+    private static final AtomicLong QRY_ID_GEN = new AtomicLong();
+
+    /** Kernel context. */
+    private final GridKernalContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock;
+
+    /** Maximum allowed cursors. */
+    private final int maxCursors;
+
+    /** Current queries cursors. */
+    private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCursors = new ConcurrentHashMap<>();
+
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
+    /** Enforce join order flag. */
+    private final boolean enforceJoinOrder;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     * @param busyLock Shutdown latch.
+     * @param maxCursors Maximum allowed cursors.
+     * @param distributedJoins Distributed joins flag.
+     * @param enforceJoinOrder Enforce join order flag.
+     */
+    public SqlListenerRequestHandlerImpl(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors,
+        boolean distributedJoins, boolean enforceJoinOrder) {
+        this.ctx = ctx;
+        this.busyLock = busyLock;
+        this.maxCursors = maxCursors;
+        this.distributedJoins = distributedJoins;
+        this.enforceJoinOrder = enforceJoinOrder;
+
+        log = ctx.log(getClass());
+    }
+
+    /** {@inheritDoc} */
+    @Override public SqlListenerResponse handle(SqlListenerRequest req) {
+        assert req != null;
+
+        if (!busyLock.enterBusy())
+            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Failed to handle ODBC request because node is stopping: " + req);
+
+        try {
+            switch (req.command()) {
+                case QRY_EXEC:
+                    return executeQuery((SqlListenerQueryExecuteRequest)req);
+
+                case QRY_FETCH:
+                    return fetchQuery((SqlListenerQueryFetchRequest)req);
+
+                case QRY_CLOSE:
+                    return closeQuery((SqlListenerQueryCloseRequest)req);
+
+                case META_COLS:
+                    return getColumnsMeta((OdbcQueryGetColumnsMetaRequest)req);
+
+                case META_TBLS:
+                    return getTablesMeta((OdbcQueryGetTablesMetaRequest)req);
+
+                case META_PARAMS:
+                    return getParamsMeta((OdbcQueryGetParamsMetaRequest)req);
+            }
+
+            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, "Unsupported ODBC request: " + req);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * {@link SqlListenerQueryExecuteRequest} command handler.
+     *
+     * @param req Execute query request.
+     * @return Response.
+     */
+    private SqlListenerResponse executeQuery(SqlListenerQueryExecuteRequest req) {
+        int cursorCnt = qryCursors.size();
+
+        if (maxCursors > 0 && cursorCnt >= maxCursors)
+            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, "Too many opened cursors (either close other " +
+                "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " +
+                "[maximum=" + maxCursors + ", current=" + cursorCnt + ']');
+
+        long qryId = QRY_ID_GEN.getAndIncrement();
+
+        try {
+            String sql = OdbcEscapeUtils.parse(req.sqlQuery());
+
+            if (log.isDebugEnabled())
+                log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() +
+                    ", parsed=" + sql + ']');
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+            qry.setArgs(req.arguments());
+
+            qry.setDistributedJoins(distributedJoins);
+            qry.setEnforceJoinOrder(enforceJoinOrder);
+
+            IgniteCache<Object, Object> cache0 = ctx.grid().cache(req.cacheName());
+
+            if (cache0 == null)
+                return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Cache doesn't exist (did you configure it?): " + req.cacheName());
+
+            IgniteCache<Object, Object> cache = cache0.withKeepBinary();
+
+            if (cache == null)
+                return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Can not get cache with keep binary: " + req.cacheName());
+
+            QueryCursor qryCur = cache.query(qry);
+
+            qryCursors.put(qryId, new IgniteBiTuple<QueryCursor, Iterator>(qryCur, null));
+
+            List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta();
+
+            SqlListenerQueryExecuteResult res = new SqlListenerQueryExecuteResult(qryId, convertMetadata(fieldsMeta));
+
+            return new SqlListenerResponse(res);
+        }
+        catch (Exception e) {
+            qryCursors.remove(qryId);
+
+            U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link SqlListenerQueryCloseRequest} command handler.
+     *
+     * @param req Execute query request.
+     * @return Response.
+     */
+    private SqlListenerResponse closeQuery(SqlListenerQueryCloseRequest req) {
+        try {
+            IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId());
+
+            if (tuple == null)
+                return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Failed to find query with ID: " + req.queryId());
+
+            QueryCursor cur = tuple.get1();
+
+            assert(cur != null);
+
+            cur.close();
+
+            qryCursors.remove(req.queryId());
+
+            SqlListenerQueryCloseResult res = new SqlListenerQueryCloseResult(req.queryId());
+
+            return new SqlListenerResponse(res);
+        }
+        catch (Exception e) {
+            qryCursors.remove(req.queryId());
+
+            U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e);
+
+            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link SqlListenerQueryFetchRequest} command handler.
+     *
+     * @param req Execute query request.
+     * @return Response.
+     */
+    private SqlListenerResponse fetchQuery(SqlListenerQueryFetchRequest req) {
+        try {
+            IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId());
+
+            if (tuple == null)
+                return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Failed to find query with ID: " + req.queryId());
+
+            Iterator iter = tuple.get2();
+
+            if (iter == null) {
+                QueryCursor cur = tuple.get1();
+
+                iter = cur.iterator();
+
+                tuple.put(cur, iter);
+            }
+
+            List<Object> items = new ArrayList<>();
+
+            for (int i = 0; i < req.pageSize() && iter.hasNext(); ++i)
+                items.add(iter.next());
+
+            SqlListenerQueryFetchResult res = new SqlListenerQueryFetchResult(req.queryId(), items, !iter.hasNext());
+
+            return new SqlListenerResponse(res);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link OdbcQueryGetColumnsMetaRequest} command handler.
+     *
+     * @param req Get columns metadata request.
+     * @return Response.
+     */
+    private SqlListenerResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) {
+        try {
+            List<SqlListenerColumnMeta> meta = new ArrayList<>();
+
+            String cacheName;
+            String tableName;
+
+            if (req.tableName().contains(".")) {
+                // Parsing two-part table name.
+                String[] parts = req.tableName().split("\\.");
+
+                cacheName = OdbcUtils.removeQuotationMarksIfNeeded(parts[0]);
+
+                tableName = parts[1];
+            }
+            else {
+                cacheName = OdbcUtils.removeQuotationMarksIfNeeded(req.cacheName());
+
+                tableName = req.tableName();
+            }
+
+            Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName);
+
+            for (GridQueryTypeDescriptor table : tablesMeta) {
+                if (!matches(table.name(), tableName))
+                    continue;
+
+                for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) {
+                    if (!matches(field.getKey(), req.columnName()))
+                        continue;
+
+                    SqlListenerColumnMeta columnMeta = new SqlListenerColumnMeta(req.cacheName(), table.name(),
+                        field.getKey(), field.getValue());
+
+                    if (!meta.contains(columnMeta))
+                        meta.add(columnMeta);
+                }
+            }
+
+            OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta);
+
+            return new SqlListenerResponse(res);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to get columns metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link OdbcQueryGetTablesMetaRequest} command handler.
+     *
+     * @param req Get tables metadata request.
+     * @return Response.
+     */
+    private SqlListenerResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) {
+        try {
+            List<OdbcTableMeta> meta = new ArrayList<>();
+
+            String realSchema = OdbcUtils.removeQuotationMarksIfNeeded(req.schema());
+
+            for (String cacheName : ctx.cache().cacheNames())
+            {
+                if (!matches(cacheName, realSchema))
+                    continue;
+
+                Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName);
+
+                for (GridQueryTypeDescriptor table : tablesMeta) {
+                    if (!matches(table.name(), req.table()))
+                        continue;
+
+                    if (!matches("TABLE", req.tableType()))
+                        continue;
+
+                    OdbcTableMeta tableMeta = new OdbcTableMeta(null, cacheName, table.name(), "TABLE");
+
+                    if (!meta.contains(tableMeta))
+                        meta.add(tableMeta);
+                }
+            }
+
+            OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta);
+
+            return new SqlListenerResponse(res);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to get tables metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link OdbcQueryGetParamsMetaRequest} command handler.
+     *
+     * @param req Get params metadata request.
+     * @return Response.
+     */
+    private SqlListenerResponse getParamsMeta(OdbcQueryGetParamsMetaRequest req) {
+        try {
+            PreparedStatement stmt = ctx.query().prepareNativeStatement(req.cacheName(), req.query());
+
+            ParameterMetaData pmd = stmt.getParameterMetaData();
+
+            byte[] typeIds = new byte[pmd.getParameterCount()];
+
+            for (int i = 1; i <= pmd.getParameterCount(); ++i) {
+                int sqlType = pmd.getParameterType(i);
+
+                typeIds[i - 1] = sqlTypeToBinary(sqlType);
+            }
+
+            OdbcQueryGetParamsMetaResult res = new OdbcQueryGetParamsMetaResult(typeIds);
+
+            return new SqlListenerResponse(res);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to get params metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * Convert {@link java.sql.Types} to binary type constant (See {@link GridBinaryMarshaller} constants).
+     *
+     * @param sqlType SQL type.
+     * @return Binary type.
+     */
+    private static byte sqlTypeToBinary(int sqlType) {
+        switch (sqlType) {
+            case Types.BIGINT:
+                return GridBinaryMarshaller.LONG;
+
+            case Types.BOOLEAN:
+                return GridBinaryMarshaller.BOOLEAN;
+
+            case Types.DATE:
+                return GridBinaryMarshaller.DATE;
+
+            case Types.DOUBLE:
+                return GridBinaryMarshaller.DOUBLE;
+
+            case Types.FLOAT:
+            case Types.REAL:
+                return GridBinaryMarshaller.FLOAT;
+
+            case Types.NUMERIC:
+            case Types.DECIMAL:
+                return GridBinaryMarshaller.DECIMAL;
+
+            case Types.INTEGER:
+                return GridBinaryMarshaller.INT;
+
+            case Types.SMALLINT:
+                return GridBinaryMarshaller.SHORT;
+
+            case Types.TIME:
+                return GridBinaryMarshaller.TIME;
+
+            case Types.TIMESTAMP:
+                return GridBinaryMarshaller.TIMESTAMP;
+
+            case Types.TINYINT:
+                return GridBinaryMarshaller.BYTE;
+
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGNVARCHAR:
+                return GridBinaryMarshaller.STRING;
+
+            case Types.NULL:
+                return GridBinaryMarshaller.NULL;
+
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            default:
+                return GridBinaryMarshaller.BYTE_ARR;
+        }
+    }
+
+    /**
+     * Convert metadata in collection from {@link GridQueryFieldMetadata} to
+     * {@link SqlListenerColumnMeta}.
+     *
+     * @param meta Internal query field metadata.
+     * @return Odbc query field metadata.
+     */
+    private static Collection<SqlListenerColumnMeta> convertMetadata(Collection<?> meta) {
+        List<SqlListenerColumnMeta> res = new ArrayList<>();
+
+        if (meta != null) {
+            for (Object info : meta) {
+                assert info instanceof GridQueryFieldMetadata;
+
+                res.add(new SqlListenerColumnMeta((GridQueryFieldMetadata)info));
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Checks whether string matches SQL pattern.
+     *
+     * @param str String.
+     * @param ptrn Pattern.
+     * @return Whether string matches pattern.
+     */
+    private static boolean matches(String str, String ptrn) {
+        return str != null && (F.isEmpty(ptrn) ||
+            str.toUpperCase().matches(ptrn.toUpperCase().replace("%", ".*").replace("_", ".")));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
new file mode 100644
index 0000000..cf87712
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcMessageParser.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
+import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractMessageParser;
+
+/**
+ * JDBC message parser.
+ */
+public class JdbcMessageParser extends SqlListenerAbstractMessageParser {
+    /**
+     * @param ctx Context.
+     */
+    public JdbcMessageParser(GridKernalContext ctx) {
+        super(ctx, new JdbcObjectReader(), new JdbcObjectWriter());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected BinaryReaderExImpl createReader(byte[] msg) {
+        BinaryInputStream stream = new BinaryHeapInputStream(msg);
+
+        return new BinaryReaderExImpl(null, stream, ctx.config().getClassLoader(), true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected BinaryWriterExImpl createWriter(int cap) {
+        return new BinaryWriterExImpl(null, new BinaryHeapOutputStream(cap), null, null);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectReader.java
new file mode 100644
index 0000000..81c8c10
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectReader.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectReader;
+
+/**
+ * Binary reader with marshaling non-primitive and non-embedded objects with JDK marshaller.
+ */
+@SuppressWarnings("unchecked")
+public class JdbcObjectReader extends SqlListenerAbstractObjectReader {
+    /** {@inheritDoc} */
+    @Override protected Object readCustomObject(BinaryReaderExImpl reader) throws BinaryObjectException {
+        throw new BinaryObjectException("JDBC doesn't support custom objects.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectWriter.java
new file mode 100644
index 0000000..e87ef50
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcObjectWriter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectWriter;
+
+/**
+ * Binary writer with marshaling non-primitive and non-embedded objects with JDK marshaller..
+ */
+public class JdbcObjectWriter extends SqlListenerAbstractObjectWriter {
+    /** {@inheritDoc} */
+    @Override protected void writeCustomObject(BinaryWriterExImpl writer, Object obj)
+        throws BinaryObjectException {
+        throw new BinaryObjectException("JDBC doesn't support custom objects.");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
index af595b9..300385f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
@@ -18,262 +18,51 @@
 package org.apache.ignite.internal.processors.odbc.odbc;
 
 import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryThreadLocalContext;
 import org.apache.ignite.internal.binary.BinaryWriterExImpl;
 import org.apache.ignite.internal.binary.GridBinaryMarshaller;
 import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetColumnsMetaRequest;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetColumnsMetaResult;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetParamsMetaRequest;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetParamsMetaResult;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetTablesMetaRequest;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetTablesMetaResult;
-import org.apache.ignite.internal.processors.odbc.OdbcTableMeta;
-import org.apache.ignite.internal.processors.odbc.SqlListenerColumnMeta;
-import org.apache.ignite.internal.processors.odbc.SqlListenerMessageParser;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryCloseRequest;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryCloseResult;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryExecuteRequest;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryExecuteResult;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryFetchRequest;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryFetchResult;
-import org.apache.ignite.internal.processors.odbc.SqlListenerRequest;
-import org.apache.ignite.internal.processors.odbc.SqlListenerResponse;
-
-import java.util.Collection;
+import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractMessageParser;
 
 /**
- * ODBC message parser.
+ * JDBC message parser.
  */
-public class OdbcMessageParser implements SqlListenerMessageParser {
-    /** Initial output stream capacity. */
-    private static final int INIT_CAP = 1024;
-
+public class OdbcMessageParser extends SqlListenerAbstractMessageParser {
     /** Marshaller. */
     private final GridBinaryMarshaller marsh;
 
-    /** Logger. */
-    private final IgniteLogger log;
-
     /**
      * @param ctx Context.
      */
-    public OdbcMessageParser(final GridKernalContext ctx) {
-        CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
+    public OdbcMessageParser(GridKernalContext ctx) {
+        super(ctx, new OdbcObjectReader(), new OdbcObjectWriter());
 
-        this.marsh = cacheObjProc.marshaller();
+        if (ctx.cacheObjects() instanceof CacheObjectBinaryProcessorImpl) {
+            CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
 
-        this.log = ctx.log(getClass());
+            marsh = cacheObjProc.marshaller();
+        }
+        else {
+            throw new IgniteException("ODBC can only be used with BinaryMarshaller (please set it " +
+                "through IgniteConfiguration.setMarshaller())");
+        }
     }
 
     /** {@inheritDoc} */
-    @Override public SqlListenerRequest decode(byte[] msg) {
-        assert msg != null;
-
+    @Override protected BinaryReaderExImpl createReader(byte[] msg) {
         BinaryInputStream stream = new BinaryHeapInputStream(msg);
 
-        BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null, true);
-
-        byte cmd = reader.readByte();
-
-        SqlListenerRequest res;
-
-        switch (cmd) {
-            case SqlListenerRequest.QRY_EXEC: {
-                String cache = reader.readString();
-                String sql = reader.readString();
-                int argsNum = reader.readInt();
-
-                Object[] params = new Object[argsNum];
-
-                for (int i = 0; i < argsNum; ++i)
-                    params[i] = reader.readObjectDetached();
-
-                res = new SqlListenerQueryExecuteRequest(cache, sql, params);
-
-                break;
-            }
-
-            case SqlListenerRequest.QRY_FETCH: {
-                long queryId = reader.readLong();
-                int pageSize = reader.readInt();
-
-                res = new SqlListenerQueryFetchRequest(queryId, pageSize);
-
-                break;
-            }
-
-            case SqlListenerRequest.QRY_CLOSE: {
-                long queryId = reader.readLong();
-
-                res = new SqlListenerQueryCloseRequest(queryId);
-
-                break;
-            }
-
-            case SqlListenerRequest.META_COLS: {
-                String cache = reader.readString();
-                String table = reader.readString();
-                String column = reader.readString();
-
-                res = new OdbcQueryGetColumnsMetaRequest(cache, table, column);
-
-                break;
-            }
-
-            case SqlListenerRequest.META_TBLS: {
-                String catalog = reader.readString();
-                String schema = reader.readString();
-                String table = reader.readString();
-                String tableType = reader.readString();
-
-                res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
-
-                break;
-            }
-
-            case SqlListenerRequest.META_PARAMS: {
-                String cacheName = reader.readString();
-                String sqlQuery = reader.readString();
-
-                res = new OdbcQueryGetParamsMetaRequest(cacheName, sqlQuery);
-
-                break;
-            }
-
-            default:
-                throw new IgniteException("Unknown ODBC command: [cmd=" + cmd + ']');
-        }
-
-        return res;
+        return new BinaryReaderExImpl(marsh.context(), stream, ctx.config().getClassLoader(), true);
     }
 
     /** {@inheritDoc} */
-    @Override public byte[] encode(SqlListenerResponse msg) {
-        assert msg != null;
-
-        // Creating new binary writer
-        BinaryWriterExImpl writer = marsh.writer(new BinaryHeapOutputStream(INIT_CAP));
-
-        // Writing status.
-        writer.writeByte((byte) msg.status());
-
-        if (msg.status() != SqlListenerResponse.STATUS_SUCCESS) {
-            writer.writeString(msg.error());
-
-            return writer.array();
-        }
-
-        Object res0 = msg.response();
-
-        if (res0 == null)
-            return writer.array();
-        else if (res0 instanceof SqlListenerQueryExecuteResult) {
-            SqlListenerQueryExecuteResult res = (SqlListenerQueryExecuteResult) res0;
-
-            if (log.isDebugEnabled())
-                log.debug("Resulting query ID: " + res.getQueryId());
-
-            writer.writeLong(res.getQueryId());
-
-            Collection<SqlListenerColumnMeta> metas = res.getColumnsMetadata();
-
-            assert metas != null;
-
-            writer.writeInt(metas.size());
-
-            for (SqlListenerColumnMeta meta : metas)
-                meta.write(writer);
-        }
-        else if (res0 instanceof SqlListenerQueryFetchResult) {
-            SqlListenerQueryFetchResult res = (SqlListenerQueryFetchResult) res0;
-
-            if (log.isDebugEnabled())
-                log.debug("Resulting query ID: " + res.queryId());
-
-            writer.writeLong(res.queryId());
-
-            Collection<?> items0 = res.items();
-
-            assert items0 != null;
-
-            writer.writeBoolean(res.last());
-
-            writer.writeInt(items0.size());
-
-            for (Object row0 : items0) {
-                if (row0 != null) {
-                    Collection<?> row = (Collection<?>)row0;
-
-                    writer.writeInt(row.size());
-
-                    for (Object obj : row) {
-                        if (obj == null) {
-                            writer.writeObjectDetached(null);
-                            continue;
-                        }
-
-                        Class<?> cls = obj.getClass();
-
-                        if (cls == java.sql.Time.class)
-                            writer.writeTime((java.sql.Time)obj);
-                        else if (cls == java.sql.Timestamp.class)
-                            writer.writeTimestamp((java.sql.Timestamp)obj);
-                        else if (cls == java.sql.Date.class)
-                            writer.writeDate((java.util.Date)obj);
-                        else
-                            writer.writeObjectDetached(obj);
-                    }
-                }
-            }
-        }
-        else if (res0 instanceof SqlListenerQueryCloseResult) {
-            SqlListenerQueryCloseResult res = (SqlListenerQueryCloseResult) res0;
-
-            if (log.isDebugEnabled())
-                log.debug("Resulting query ID: " + res.getQueryId());
-
-            writer.writeLong(res.getQueryId());
-        }
-        else if (res0 instanceof OdbcQueryGetColumnsMetaResult) {
-            OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0;
-
-            Collection<SqlListenerColumnMeta> columnsMeta = res.meta();
-
-            assert columnsMeta != null;
-
-            writer.writeInt(columnsMeta.size());
-
-            for (SqlListenerColumnMeta columnMeta : columnsMeta)
-                columnMeta.write(writer);
-        }
-        else if (res0 instanceof OdbcQueryGetTablesMetaResult) {
-            OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0;
-
-            Collection<OdbcTableMeta> tablesMeta = res.meta();
-
-            assert tablesMeta != null;
-
-            writer.writeInt(tablesMeta.size());
-
-            for (OdbcTableMeta tableMeta : tablesMeta)
-                tableMeta.writeBinary(writer);
-        }
-        else if (res0 instanceof OdbcQueryGetParamsMetaResult) {
-            OdbcQueryGetParamsMetaResult res = (OdbcQueryGetParamsMetaResult) res0;
-
-            byte[] typeIds = res.typeIds();
-
-            writer.writeObjectDetached(typeIds);
-        }
-        else
-            assert false : "Should not reach here.";
-
-        return writer.array();
+    @Override protected BinaryWriterExImpl createWriter(int cap) {
+        return new BinaryWriterExImpl(marsh.context(), new BinaryHeapOutputStream(cap),
+            BinaryThreadLocalContext.get().schemaHolder(), null);
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java
new file mode 100644
index 0000000..586fbc5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectReader;
+
+/**
+ * Binary reader with marshaling non-primitive and non-embedded objects with JDK marshaller.
+ */
+@SuppressWarnings("unchecked")
+public class OdbcObjectReader extends SqlListenerAbstractObjectReader {
+    /** {@inheritDoc} */
+    @Override protected Object readCustomObject(BinaryReaderExImpl reader) throws BinaryObjectException {
+        return reader.readObjectDetached();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java
new file mode 100644
index 0000000..c2f3aba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectWriter;
+
+/**
+ * Binary writer with marshaling non-primitive and non-embedded objects with JDK marshaller..
+ */
+public class OdbcObjectWriter extends SqlListenerAbstractObjectWriter {
+    /** {@inheritDoc} */
+    @Override protected void writeCustomObject(BinaryWriterExImpl writer, Object obj) throws BinaryObjectException {
+        writer.writeObjectDetached(obj);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/6f1dc3ac/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
deleted file mode 100644
index eabc486..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
+++ /dev/null
@@ -1,513 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc.odbc;
-
-import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.query.QueryCursor;
-import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.binary.GridBinaryMarshaller;
-import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetColumnsMetaRequest;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetColumnsMetaResult;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetParamsMetaRequest;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetParamsMetaResult;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetTablesMetaRequest;
-import org.apache.ignite.internal.processors.odbc.OdbcQueryGetTablesMetaResult;
-import org.apache.ignite.internal.processors.odbc.OdbcTableMeta;
-import org.apache.ignite.internal.processors.odbc.OdbcUtils;
-import org.apache.ignite.internal.processors.odbc.SqlListenerColumnMeta;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryCloseRequest;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryCloseResult;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryExecuteRequest;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryExecuteResult;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryFetchRequest;
-import org.apache.ignite.internal.processors.odbc.SqlListenerQueryFetchResult;
-import org.apache.ignite.internal.processors.odbc.SqlListenerRequest;
-import org.apache.ignite.internal.processors.odbc.SqlListenerRequestHandler;
-import org.apache.ignite.internal.processors.odbc.SqlListenerResponse;
-import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils;
-import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
-import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
-import org.apache.ignite.internal.util.GridSpinBusyLock;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiTuple;
-
-import java.sql.ParameterMetaData;
-import java.sql.PreparedStatement;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_COLS;
-import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_PARAMS;
-import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.META_TBLS;
-import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_CLOSE;
-import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_EXEC;
-import static org.apache.ignite.internal.processors.odbc.SqlListenerRequest.QRY_FETCH;
-
-/**
- * SQL query handler.
- */
-public class OdbcRequestHandler implements SqlListenerRequestHandler {
-    /** Query ID sequence. */
-    private static final AtomicLong QRY_ID_GEN = new AtomicLong();
-
-    /** Kernel context. */
-    private final GridKernalContext ctx;
-
-    /** Logger. */
-    private final IgniteLogger log;
-
-    /** Busy lock. */
-    private final GridSpinBusyLock busyLock;
-
-    /** Maximum allowed cursors. */
-    private final int maxCursors;
-
-    /** Current queries cursors. */
-    private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCursors = new ConcurrentHashMap<>();
-
-    /** Distributed joins flag. */
-    private final boolean distributedJoins;
-
-    /** Enforce join order flag. */
-    private final boolean enforceJoinOrder;
-
-    /**
-     * Constructor.
-     *
-     * @param ctx Context.
-     * @param busyLock Shutdown latch.
-     * @param maxCursors Maximum allowed cursors.
-     * @param distributedJoins Distributed joins flag.
-     * @param enforceJoinOrder Enforce join order flag.
-     */
-    public OdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors,
-        boolean distributedJoins, boolean enforceJoinOrder) {
-        this.ctx = ctx;
-        this.busyLock = busyLock;
-        this.maxCursors = maxCursors;
-        this.distributedJoins = distributedJoins;
-        this.enforceJoinOrder = enforceJoinOrder;
-
-        log = ctx.log(getClass());
-    }
-
-    /** {@inheritDoc} */
-    @Override public SqlListenerResponse handle(SqlListenerRequest req) {
-        assert req != null;
-
-        if (!busyLock.enterBusy())
-            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED,
-                    "Failed to handle ODBC request because node is stopping: " + req);
-
-        try {
-            switch (req.command()) {
-                case QRY_EXEC:
-                    return executeQuery((SqlListenerQueryExecuteRequest)req);
-
-                case QRY_FETCH:
-                    return fetchQuery((SqlListenerQueryFetchRequest)req);
-
-                case QRY_CLOSE:
-                    return closeQuery((SqlListenerQueryCloseRequest)req);
-
-                case META_COLS:
-                    return getColumnsMeta((OdbcQueryGetColumnsMetaRequest)req);
-
-                case META_TBLS:
-                    return getTablesMeta((OdbcQueryGetTablesMetaRequest)req);
-
-                case META_PARAMS:
-                    return getParamsMeta((OdbcQueryGetParamsMetaRequest)req);
-            }
-
-            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, "Unsupported ODBC request: " + req);
-        }
-        finally {
-            busyLock.leaveBusy();
-        }
-    }
-
-    /**
-     * {@link SqlListenerQueryExecuteRequest} command handler.
-     *
-     * @param req Execute query request.
-     * @return Response.
-     */
-    private SqlListenerResponse executeQuery(SqlListenerQueryExecuteRequest req) {
-        int cursorCnt = qryCursors.size();
-
-        if (maxCursors > 0 && cursorCnt >= maxCursors)
-            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, "Too many opened cursors (either close other " +
-                "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " +
-                "[maximum=" + maxCursors + ", current=" + cursorCnt + ']');
-
-        long qryId = QRY_ID_GEN.getAndIncrement();
-
-        try {
-            String sql = OdbcEscapeUtils.parse(req.sqlQuery());
-
-            if (log.isDebugEnabled())
-                log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() +
-                    ", parsed=" + sql + ']');
-
-            SqlFieldsQuery qry = new SqlFieldsQuery(sql);
-
-            qry.setArgs(req.arguments());
-
-            qry.setDistributedJoins(distributedJoins);
-            qry.setEnforceJoinOrder(enforceJoinOrder);
-
-            IgniteCache<Object, Object> cache0 = ctx.grid().cache(req.cacheName());
-
-            if (cache0 == null)
-                return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED,
-                    "Cache doesn't exist (did you configure it?): " + req.cacheName());
-
-            IgniteCache<Object, Object> cache = cache0.withKeepBinary();
-
-            if (cache == null)
-                return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED,
-                    "Can not get cache with keep binary: " + req.cacheName());
-
-            QueryCursor qryCur = cache.query(qry);
-
-            qryCursors.put(qryId, new IgniteBiTuple<QueryCursor, Iterator>(qryCur, null));
-
-            List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta();
-
-            SqlListenerQueryExecuteResult res = new SqlListenerQueryExecuteResult(qryId, convertMetadata(fieldsMeta));
-
-            return new SqlListenerResponse(res);
-        }
-        catch (Exception e) {
-            qryCursors.remove(qryId);
-
-            U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
-
-            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
-        }
-    }
-
-    /**
-     * {@link SqlListenerQueryCloseRequest} command handler.
-     *
-     * @param req Execute query request.
-     * @return Response.
-     */
-    private SqlListenerResponse closeQuery(SqlListenerQueryCloseRequest req) {
-        try {
-            IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId());
-
-            if (tuple == null)
-                return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED,
-                    "Failed to find query with ID: " + req.queryId());
-
-            QueryCursor cur = tuple.get1();
-
-            assert(cur != null);
-
-            cur.close();
-
-            qryCursors.remove(req.queryId());
-
-            SqlListenerQueryCloseResult res = new SqlListenerQueryCloseResult(req.queryId());
-
-            return new SqlListenerResponse(res);
-        }
-        catch (Exception e) {
-            qryCursors.remove(req.queryId());
-
-            U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e);
-
-            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
-        }
-    }
-
-    /**
-     * {@link SqlListenerQueryFetchRequest} command handler.
-     *
-     * @param req Execute query request.
-     * @return Response.
-     */
-    private SqlListenerResponse fetchQuery(SqlListenerQueryFetchRequest req) {
-        try {
-            IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId());
-
-            if (tuple == null)
-                return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED,
-                    "Failed to find query with ID: " + req.queryId());
-
-            Iterator iter = tuple.get2();
-
-            if (iter == null) {
-                QueryCursor cur = tuple.get1();
-
-                iter = cur.iterator();
-
-                tuple.put(cur, iter);
-            }
-
-            List<Object> items = new ArrayList<>();
-
-            for (int i = 0; i < req.pageSize() && iter.hasNext(); ++i)
-                items.add(iter.next());
-
-            SqlListenerQueryFetchResult res = new SqlListenerQueryFetchResult(req.queryId(), items, !iter.hasNext());
-
-            return new SqlListenerResponse(res);
-        }
-        catch (Exception e) {
-            U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e);
-
-            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
-        }
-    }
-
-    /**
-     * {@link OdbcQueryGetColumnsMetaRequest} command handler.
-     *
-     * @param req Get columns metadata request.
-     * @return Response.
-     */
-    private SqlListenerResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) {
-        try {
-            List<SqlListenerColumnMeta> meta = new ArrayList<>();
-
-            String cacheName;
-            String tableName;
-
-            if (req.tableName().contains(".")) {
-                // Parsing two-part table name.
-                String[] parts = req.tableName().split("\\.");
-
-                cacheName = OdbcUtils.removeQuotationMarksIfNeeded(parts[0]);
-
-                tableName = parts[1];
-            }
-            else {
-                cacheName = OdbcUtils.removeQuotationMarksIfNeeded(req.cacheName());
-
-                tableName = req.tableName();
-            }
-
-            Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName);
-
-            for (GridQueryTypeDescriptor table : tablesMeta) {
-                if (!matches(table.name(), tableName))
-                    continue;
-
-                for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) {
-                    if (!matches(field.getKey(), req.columnName()))
-                        continue;
-
-                    SqlListenerColumnMeta columnMeta = new SqlListenerColumnMeta(req.cacheName(), table.name(),
-                        field.getKey(), field.getValue());
-
-                    if (!meta.contains(columnMeta))
-                        meta.add(columnMeta);
-                }
-            }
-
-            OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta);
-
-            return new SqlListenerResponse(res);
-        }
-        catch (Exception e) {
-            U.error(log, "Failed to get columns metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
-
-            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
-        }
-    }
-
-    /**
-     * {@link OdbcQueryGetTablesMetaRequest} command handler.
-     *
-     * @param req Get tables metadata request.
-     * @return Response.
-     */
-    private SqlListenerResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) {
-        try {
-            List<OdbcTableMeta> meta = new ArrayList<>();
-
-            String realSchema = OdbcUtils.removeQuotationMarksIfNeeded(req.schema());
-
-            for (String cacheName : ctx.cache().cacheNames())
-            {
-                if (!matches(cacheName, realSchema))
-                    continue;
-
-                Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName);
-
-                for (GridQueryTypeDescriptor table : tablesMeta) {
-                    if (!matches(table.name(), req.table()))
-                        continue;
-
-                    if (!matches("TABLE", req.tableType()))
-                        continue;
-
-                    OdbcTableMeta tableMeta = new OdbcTableMeta(null, cacheName, table.name(), "TABLE");
-
-                    if (!meta.contains(tableMeta))
-                        meta.add(tableMeta);
-                }
-            }
-
-            OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta);
-
-            return new SqlListenerResponse(res);
-        }
-        catch (Exception e) {
-            U.error(log, "Failed to get tables metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
-
-            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
-        }
-    }
-
-    /**
-     * {@link OdbcQueryGetParamsMetaRequest} command handler.
-     *
-     * @param req Get params metadata request.
-     * @return Response.
-     */
-    private SqlListenerResponse getParamsMeta(OdbcQueryGetParamsMetaRequest req) {
-        try {
-            PreparedStatement stmt = ctx.query().prepareNativeStatement(req.cacheName(), req.query());
-
-            ParameterMetaData pmd = stmt.getParameterMetaData();
-
-            byte[] typeIds = new byte[pmd.getParameterCount()];
-
-            for (int i = 1; i <= pmd.getParameterCount(); ++i) {
-                int sqlType = pmd.getParameterType(i);
-
-                typeIds[i - 1] = sqlTypeToBinary(sqlType);
-            }
-
-            OdbcQueryGetParamsMetaResult res = new OdbcQueryGetParamsMetaResult(typeIds);
-
-            return new SqlListenerResponse(res);
-        }
-        catch (Exception e) {
-            U.error(log, "Failed to get params metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
-
-            return new SqlListenerResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
-        }
-    }
-
-    /**
-     * Convert {@link java.sql.Types} to binary type constant (See {@link GridBinaryMarshaller} constants).
-     *
-     * @param sqlType SQL type.
-     * @return Binary type.
-     */
-    private static byte sqlTypeToBinary(int sqlType) {
-        switch (sqlType) {
-            case Types.BIGINT:
-                return GridBinaryMarshaller.LONG;
-
-            case Types.BOOLEAN:
-                return GridBinaryMarshaller.BOOLEAN;
-
-            case Types.DATE:
-                return GridBinaryMarshaller.DATE;
-
-            case Types.DOUBLE:
-                return GridBinaryMarshaller.DOUBLE;
-
-            case Types.FLOAT:
-            case Types.REAL:
-                return GridBinaryMarshaller.FLOAT;
-
-            case Types.NUMERIC:
-            case Types.DECIMAL:
-                return GridBinaryMarshaller.DECIMAL;
-
-            case Types.INTEGER:
-                return GridBinaryMarshaller.INT;
-
-            case Types.SMALLINT:
-                return GridBinaryMarshaller.SHORT;
-
-            case Types.TIME:
-                return GridBinaryMarshaller.TIME;
-
-            case Types.TIMESTAMP:
-                return GridBinaryMarshaller.TIMESTAMP;
-
-            case Types.TINYINT:
-                return GridBinaryMarshaller.BYTE;
-
-            case Types.CHAR:
-            case Types.VARCHAR:
-            case Types.LONGNVARCHAR:
-                return GridBinaryMarshaller.STRING;
-
-            case Types.NULL:
-                return GridBinaryMarshaller.NULL;
-
-            case Types.BINARY:
-            case Types.VARBINARY:
-            case Types.LONGVARBINARY:
-            default:
-                return GridBinaryMarshaller.BYTE_ARR;
-        }
-    }
-
-    /**
-     * Convert metadata in collection from {@link GridQueryFieldMetadata} to
-     * {@link SqlListenerColumnMeta}.
-     *
-     * @param meta Internal query field metadata.
-     * @return Odbc query field metadata.
-     */
-    private static Collection<SqlListenerColumnMeta> convertMetadata(Collection<?> meta) {
-        List<SqlListenerColumnMeta> res = new ArrayList<>();
-
-        if (meta != null) {
-            for (Object info : meta) {
-                assert info instanceof GridQueryFieldMetadata;
-
-                res.add(new SqlListenerColumnMeta((GridQueryFieldMetadata)info));
-            }
-        }
-
-        return res;
-    }
-
-    /**
-     * Checks whether string matches SQL pattern.
-     *
-     * @param str String.
-     * @param ptrn Pattern.
-     * @return Whether string matches pattern.
-     */
-    private static boolean matches(String str, String ptrn) {
-        return str != null && (F.isEmpty(ptrn) ||
-            str.toUpperCase().matches(ptrn.toUpperCase().replace("%", ".*").replace("_", ".")));
-    }
-}


Mime
View raw message