ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [07/50] [abbrv] ignite git commit: IGNITE-5176: JDBC thin driver: implemented query execution. This closes #1994. This closes #2040.
Date Tue, 06 Jun 2017 08:59:25 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
new file mode 100644
index 0000000..b319293
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcRequestHandler.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.odbc.SqlListenerRequest;
+import org.apache.ignite.internal.processors.odbc.SqlListenerRequestHandler;
+import org.apache.ignite.internal.processors.odbc.SqlListenerResponse;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_CLOSE;
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_EXEC;
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_FETCH;
+import static org.apache.ignite.internal.processors.odbc.jdbc.JdbcRequest.QRY_META;
+
+/**
+ * SQL query handler.
+ */
+public class JdbcRequestHandler implements SqlListenerRequestHandler {
+    /** Query ID sequence. */
+    private static final AtomicLong QRY_ID_GEN = new AtomicLong();
+
+    /** Kernel context. */
+    private final GridKernalContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock;
+
+    /** Maximum allowed cursors. */
+    private final int maxCursors;
+
+    /** Current queries cursors. */
+    private final ConcurrentHashMap<Long, JdbcQueryCursor> qryCursors = new ConcurrentHashMap<>();
+
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
+    /** Enforce join order flag. */
+    private final boolean enforceJoinOrder;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     * @param busyLock Shutdown latch.
+     * @param maxCursors Maximum allowed cursors.
+     * @param distributedJoins Distributed joins flag.
+     * @param enforceJoinOrder Enforce join order flag.
+     */
+    public JdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors,
+        boolean distributedJoins, boolean enforceJoinOrder) {
+        this.ctx = ctx;
+        this.busyLock = busyLock;
+        this.maxCursors = maxCursors;
+        this.distributedJoins = distributedJoins;
+        this.enforceJoinOrder = enforceJoinOrder;
+
+        log = ctx.log(getClass());
+    }
+
+    /** {@inheritDoc} */
+    @Override public SqlListenerResponse handle(SqlListenerRequest req0) {
+        assert req0 != null;
+
+        assert req0 instanceof JdbcRequest;
+
+        JdbcRequest req = (JdbcRequest)req0;
+
+        if (!busyLock.enterBusy())
+            return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, 
+                "Failed to handle JDBC request because node is stopping.");
+
+        try {
+            switch (req.type()) {
+                case QRY_EXEC:
+                    return executeQuery((JdbcQueryExecuteRequest)req);
+
+                case QRY_FETCH:
+                    return fetchQuery((JdbcQueryFetchRequest)req);
+
+                case QRY_CLOSE:
+                    return closeQuery((JdbcQueryCloseRequest)req);
+
+                case QRY_META:
+                    return getQueryMeta((JdbcQueryMetadataRequest)req);
+            }
+
+            return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, "Unsupported JDBC request [req=" + req + ']');
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public SqlListenerResponse handleException(Exception e) {
+        return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+    }
+
+    /**
+     * {@link JdbcQueryExecuteRequest} command handler.
+     *
+     * @param req Execute query request.
+     * @return Response.
+     */
+    @SuppressWarnings("unchecked")
+    private JdbcResponse executeQuery(JdbcQueryExecuteRequest req) {
+        int cursorCnt = qryCursors.size();
+
+        if (maxCursors > 0 && cursorCnt >= maxCursors)
+            return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, "Too many opened cursors (either close other " +
+                "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " +
+                "[maximum=" + maxCursors + ", current=" + cursorCnt + ']');
+
+        long qryId = QRY_ID_GEN.getAndIncrement();
+
+        try {
+            String sql = req.sqlQuery();
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+            qry.setArgs(req.arguments());
+
+            qry.setDistributedJoins(distributedJoins);
+            qry.setEnforceJoinOrder(enforceJoinOrder);
+
+            if (req.pageSize() <= 0)
+                return new JdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Invalid fetch size : [fetchSize=" + req.pageSize() + ']');
+
+            qry.setPageSize(req.pageSize());
+
+            IgniteCache<Object, Object> cache0 = ctx.grid().cache(req.schemaName());
+
+            if (cache0 == null)
+                return new JdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Cache doesn't exist (did you configure it?): " + req.schemaName());
+
+            IgniteCache<Object, Object> cache = cache0.withKeepBinary();
+
+            if (cache == null)
+                return new JdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Can not get cache with keep binary: " + req.schemaName());
+
+            JdbcQueryCursor cur = new JdbcQueryCursor(
+                qryId, req.pageSize(), req.maxRows(), (QueryCursorImpl)cache.query(qry));
+
+            qryCursors.put(qryId, cur);
+
+            JdbcQueryExecuteResult res;
+
+            if (cur.isQuery())
+                res = new JdbcQueryExecuteResult(qryId, cur.fetchRows(), !cur.hasNext());
+            else {
+                List<List<Object>> items = cur.fetchRows();
+
+                assert items != null && items.size() == 1 && items.get(0).size() == 1
+                    && items.get(0).get(0) instanceof Long :
+                    "Invalid result set for not-SELECT query. [qry=" + sql +
+                        ", res=" + S.toString(List.class, items) + ']';
+
+                res = new JdbcQueryExecuteResult(qryId, (Long)items.get(0).get(0));
+            }
+
+            return new JdbcResponse(res);
+        }
+        catch (Exception e) {
+            qryCursors.remove(qryId);
+
+            U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link JdbcQueryCloseRequest} command handler.
+     *
+     * @param req Execute query request.
+     * @return Response.
+     */
+    private JdbcResponse closeQuery(JdbcQueryCloseRequest req) {
+        try {
+            JdbcQueryCursor cur = qryCursors.remove(req.queryId());
+
+            if (cur == null)
+                return new JdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Failed to find query cursor with ID: " + req.queryId());
+
+            cur.close();
+
+            return new JdbcResponse(null);
+        }
+        catch (Exception e) {
+            qryCursors.remove(req.queryId());
+
+            U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e);
+
+            return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link JdbcQueryFetchRequest} command handler.
+     *
+     * @param req Execute query request.
+     * @return Response.
+     */
+    private JdbcResponse fetchQuery(JdbcQueryFetchRequest req) {
+        try {
+            JdbcQueryCursor cur = qryCursors.get(req.queryId());
+
+            if (cur == null)
+                return new JdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Failed to find query cursor with ID: " + req.queryId());
+
+            if (req.pageSize() <= 0)
+                return new JdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Invalid fetch size : [fetchSize=" + req.pageSize() + ']');
+
+            cur.pageSize(req.pageSize());
+
+            JdbcQueryFetchResult res = new JdbcQueryFetchResult(cur.fetchRows(), !cur.hasNext());
+
+            return new JdbcResponse(res);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * @param req Request.
+     * @return Response.
+     */
+    private JdbcResponse getQueryMeta(JdbcQueryMetadataRequest req) {
+        try {
+            JdbcQueryCursor cur = qryCursors.get(req.queryId());
+
+            if (cur == null)
+                return new JdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Failed to find query with ID: " + req.queryId());
+
+            JdbcQueryMetadataResult res = new JdbcQueryMetadataResult(req.queryId(),
+                cur.meta());
+
+            return new JdbcResponse(res);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new JdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponse.java
new file mode 100644
index 0000000..f039db7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResponse.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.processors.odbc.SqlListenerResponse;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * SQL listener response.
+ */
+public class JdbcResponse extends SqlListenerResponse implements JdbcRawBinarylizable {
+    /** Response object. */
+    @GridToStringInclude
+    private JdbcResult res;
+
+    /**
+     * Default constructs is used for deserialization
+     */
+    public JdbcResponse() {
+        super(-1, null);
+    }
+
+    /**
+     * Constructs successful rest response.
+     *
+     * @param res Response result.
+     */
+    public JdbcResponse(JdbcResult res) {
+        super(STATUS_SUCCESS, null);
+
+        this.res = res;
+    }
+
+    /**
+     * Constructs failed rest response.
+     *
+     * @param status Response status.
+     * @param err Error, {@code null} if success is {@code true}.
+     */
+    public JdbcResponse(int status, @Nullable String err) {
+        super(status, err);
+
+        assert status != STATUS_SUCCESS;
+    }
+
+    /**
+     * @return Response object.
+     */
+    public JdbcResult response() {
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(JdbcResponse.class, this, "status", status(),"err", error());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+        writer.writeInt(status());
+
+        if (status() == STATUS_SUCCESS) {
+            writer.writeBoolean(res != null);
+
+            if (res != null)
+                res.writeBinary(writer);
+        }
+        else
+            writer.writeString(error());
+
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+        status(reader.readInt());
+
+        if (status() == STATUS_SUCCESS) {
+            if (reader.readBoolean())
+                res = JdbcResult.readResult(reader);
+        }
+        else
+            error(reader.readString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
new file mode 100644
index 0000000..2d7666e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcResult.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+
+/**
+ * SQL listener response.
+ */
+public class JdbcResult implements JdbcRawBinarylizable {
+    /** Execute sql result. */
+    public static final byte QRY_EXEC = 2;
+
+    /** Fetch query results. */
+    public static final byte QRY_FETCH = 3;
+
+    /** Get columns meta query result. */
+    public static final byte QRY_META = 4;
+
+    /** Success status. */
+    private byte type;
+
+    /**
+     * Constructs result.
+     *
+     * @param type Type of results.
+     */
+    public JdbcResult(byte type) {
+        this.type = type;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeBinary(BinaryWriterExImpl writer) throws BinaryObjectException {
+        writer.writeByte(type);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readBinary(BinaryReaderExImpl reader) throws BinaryObjectException {
+        // No-op.
+    }
+
+    /**
+     * @param reader Binary reader.
+     * @return Request object.
+     * @throws BinaryObjectException On error.
+     */
+    public static JdbcResult readResult(BinaryReaderExImpl reader) throws BinaryObjectException {
+        int resId = reader.readByte();
+
+        JdbcResult res;
+
+        switch(resId) {
+            case QRY_EXEC:
+                res = new JdbcQueryExecuteResult();
+                break;
+
+            case QRY_FETCH:
+                res = new JdbcQueryFetchResult();
+                break;
+
+            case QRY_META:
+                res = new JdbcQueryMetadataResult();
+                break;
+
+            default:
+                throw new IgniteException("Unknown SQL listener request ID: [request ID=" + resId + ']');
+        }
+
+        res.readBinary(reader);
+
+        return res;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
new file mode 100644
index 0000000..65efbf5
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/jdbc/JdbcUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.jdbc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.binary.BinaryWriterExImpl;
+import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
+
+/**
+ * Various JDBC utility methods.
+ */
+public class JdbcUtils {
+    /**
+     * @param writer Binari writer.
+     * @param items Query results items.
+     */
+    public static void writeItems(BinaryWriterExImpl writer, List<List<Object>> items) {
+        writer.writeInt(items.size());
+
+        for (List<Object> row : items) {
+            if (row != null) {
+                writer.writeInt(row.size());
+
+                for (Object obj : row)
+                    SqlListenerUtils.writeObject(writer, obj, false);
+            }
+        }
+    }
+
+    /**
+     * @param reader Binary reader.
+     * @return Query results items.
+     */
+    public static List<List<Object>> readItems(BinaryReaderExImpl reader) {
+        int rowsSize = reader.readInt();
+
+        if (rowsSize > 0) {
+            List<List<Object>> items = new ArrayList<>(rowsSize);
+
+            for (int i = 0; i < rowsSize; ++i) {
+                int colsSize = reader.readInt();
+
+                List<Object> col = new ArrayList<>(colsSize);
+
+                for (int colCnt = 0; colCnt < colsSize; ++colCnt)
+                    col.add(SqlListenerUtils.readObject(reader, false));
+
+                items.add(col);
+            }
+
+            return items;
+        } else
+            return Collections.emptyList();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcColumnMeta.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcColumnMeta.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcColumnMeta.java
new file mode 100644
index 0000000..d9d39de
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcColumnMeta.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import org.apache.ignite.binary.BinaryRawWriter;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.processors.odbc.OdbcUtils;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+
+/**
+ * SQL listener column metadata.
+ */
+public class OdbcColumnMeta {
+    /** Cache name. */
+    private final String schemaName;
+
+    /** Table name. */
+    private final String tableName;
+
+    /** Column name. */
+    private final String columnName;
+
+    /** Data type. */
+    private final Class<?> dataType;
+
+    /**
+     * @param schemaName Cache name.
+     * @param tableName Table name.
+     * @param columnName Column name.
+     * @param dataType Data type.
+     */
+    public OdbcColumnMeta(String schemaName, String tableName, String columnName, Class<?> dataType) {
+        this.schemaName = OdbcUtils.addQuotationMarksIfNeeded(schemaName);
+        this.tableName = tableName;
+        this.columnName = columnName;
+        this.dataType = dataType;
+    }
+
+    /**
+     * @param info Field metadata.
+     */
+    public OdbcColumnMeta(GridQueryFieldMetadata info) {
+        this.schemaName = OdbcUtils.addQuotationMarksIfNeeded(info.schemaName());
+        this.tableName = info.typeName();
+        this.columnName = info.fieldName();
+
+        Class<?> type;
+
+        try {
+            type = Class.forName(info.fieldTypeName());
+        }
+        catch (Exception ignored) {
+            type = Object.class;
+        }
+
+        this.dataType = type;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int hash = schemaName.hashCode();
+
+        hash = 31 * hash + tableName.hashCode();
+        hash = 31 * hash + columnName.hashCode();
+        hash = 31 * hash + dataType.hashCode();
+
+        return hash;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (o instanceof OdbcColumnMeta) {
+            OdbcColumnMeta other = (OdbcColumnMeta) o;
+
+            return this == other || schemaName.equals(other.schemaName) && tableName.equals(other.tableName) &&
+                columnName.equals(other.columnName) && dataType.equals(other.dataType);
+        }
+
+        return false;
+    }
+
+    /**
+     * Write in a binary format.
+     *
+     * @param writer Binary writer.
+     */
+    public void write(BinaryRawWriter writer) {
+        writer.writeString(schemaName);
+        writer.writeString(tableName);
+        writer.writeString(columnName);
+
+        byte typeId = BinaryUtils.typeByClass(dataType);
+
+        writer.writeByte(typeId);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
index 300385f..02e8676 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcMessageParser.java
@@ -17,7 +17,9 @@
 
 package org.apache.ignite.internal.processors.odbc.odbc;
 
+import java.util.Collection;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
 import org.apache.ignite.internal.binary.BinaryThreadLocalContext;
@@ -27,20 +29,34 @@ import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
-import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractMessageParser;
+import org.apache.ignite.internal.processors.odbc.SqlListenerMessageParser;
+import org.apache.ignite.internal.processors.odbc.SqlListenerRequest;
+import org.apache.ignite.internal.processors.odbc.SqlListenerResponse;
+import org.apache.ignite.internal.processors.odbc.SqlListenerUtils;
 
 /**
  * JDBC message parser.
  */
-public class OdbcMessageParser extends SqlListenerAbstractMessageParser {
+public class OdbcMessageParser implements SqlListenerMessageParser {
     /** Marshaller. */
     private final GridBinaryMarshaller marsh;
 
+    /** Initial output stream capacity. */
+    protected static final int INIT_CAP = 1024;
+
+    /** Kernal context. */
+    protected GridKernalContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
     /**
      * @param ctx Context.
      */
     public OdbcMessageParser(GridKernalContext ctx) {
-        super(ctx, new OdbcObjectReader(), new OdbcObjectWriter());
+        this.ctx = ctx;
+
+        log = ctx.log(getClass());
 
         if (ctx.cacheObjects() instanceof CacheObjectBinaryProcessorImpl) {
             CacheObjectBinaryProcessorImpl cacheObjProc = (CacheObjectBinaryProcessorImpl)ctx.cacheObjects();
@@ -54,15 +70,198 @@ public class OdbcMessageParser extends SqlListenerAbstractMessageParser {
     }
 
     /** {@inheritDoc} */
-    @Override protected BinaryReaderExImpl createReader(byte[] msg) {
+    @Override public SqlListenerRequest decode(byte[] msg) {
+        assert msg != null;
+
         BinaryInputStream stream = new BinaryHeapInputStream(msg);
 
-        return new BinaryReaderExImpl(marsh.context(), stream, ctx.config().getClassLoader(), true);
+        BinaryReaderExImpl reader = new BinaryReaderExImpl(marsh.context(), stream, ctx.config().getClassLoader(), true);
+
+        byte cmd = reader.readByte();
+
+        SqlListenerRequest res;
+
+        switch (cmd) {
+            case OdbcRequest.QRY_EXEC: {
+                String cache = reader.readString();
+                String sql = reader.readString();
+                int argsNum = reader.readInt();
+
+                Object[] params = new Object[argsNum];
+
+                for (int i = 0; i < argsNum; ++i)
+                    params[i] = SqlListenerUtils.readObject(reader, true);
+
+                res = new OdbcQueryExecuteRequest(cache, sql, params);
+
+                break;
+            }
+
+            case OdbcRequest.QRY_FETCH: {
+                long queryId = reader.readLong();
+                int pageSize = reader.readInt();
+
+                res = new OdbcQueryFetchRequest(queryId, pageSize);
+
+                break;
+            }
+
+            case OdbcRequest.QRY_CLOSE: {
+                long queryId = reader.readLong();
+
+                res = new OdbcQueryCloseRequest(queryId);
+
+                break;
+            }
+
+            case OdbcRequest.META_COLS: {
+                String cache = reader.readString();
+                String table = reader.readString();
+                String column = reader.readString();
+
+                res = new OdbcQueryGetColumnsMetaRequest(cache, table, column);
+
+                break;
+            }
+
+            case OdbcRequest.META_TBLS: {
+                String catalog = reader.readString();
+                String schema = reader.readString();
+                String table = reader.readString();
+                String tableType = reader.readString();
+
+                res = new OdbcQueryGetTablesMetaRequest(catalog, schema, table, tableType);
+
+                break;
+            }
+
+            case OdbcRequest.META_PARAMS: {
+                String cacheName = reader.readString();
+                String sqlQuery = reader.readString();
+
+                res = new OdbcQueryGetParamsMetaRequest(cacheName, sqlQuery);
+
+                break;
+            }
+
+            default:
+                throw new IgniteException("Unknown ODBC command: [cmd=" + cmd + ']');
+        }
+
+        return res;
     }
 
     /** {@inheritDoc} */
-    @Override protected BinaryWriterExImpl createWriter(int cap) {
-        return new BinaryWriterExImpl(marsh.context(), new BinaryHeapOutputStream(cap),
+    @Override public byte[] encode(SqlListenerResponse msg0) {
+        assert msg0 != null;
+
+        assert msg0 instanceof OdbcResponse;
+
+        OdbcResponse msg = (OdbcResponse)msg0;
+
+        // Creating new binary writer
+        BinaryWriterExImpl writer = new BinaryWriterExImpl(marsh.context(), new BinaryHeapOutputStream(INIT_CAP),
             BinaryThreadLocalContext.get().schemaHolder(), null);
+
+        // Writing status.
+        writer.writeByte((byte) msg.status());
+
+        if (msg.status() != SqlListenerResponse.STATUS_SUCCESS) {
+            writer.writeString(msg.error());
+
+            return writer.array();
+        }
+
+        Object res0 = msg.response();
+
+        if (res0 == null)
+            return writer.array();
+        else if (res0 instanceof OdbcQueryExecuteResult) {
+            OdbcQueryExecuteResult res = (OdbcQueryExecuteResult) res0;
+
+            if (log.isDebugEnabled())
+                log.debug("Resulting query ID: " + res.getQueryId());
+
+            writer.writeLong(res.getQueryId());
+
+            Collection<OdbcColumnMeta> metas = res.getColumnsMetadata();
+
+            assert metas != null;
+
+            writer.writeInt(metas.size());
+
+            for (OdbcColumnMeta meta : metas)
+                meta.write(writer);
+        }
+        else if (res0 instanceof OdbcQueryFetchResult) {
+            OdbcQueryFetchResult res = (OdbcQueryFetchResult) res0;
+
+            if (log.isDebugEnabled())
+                log.debug("Resulting query ID: " + res.queryId());
+
+            writer.writeLong(res.queryId());
+
+            Collection<?> items0 = res.items();
+
+            assert items0 != null;
+
+            writer.writeBoolean(res.last());
+
+            writer.writeInt(items0.size());
+
+            for (Object row0 : items0) {
+                if (row0 != null) {
+                    Collection<?> row = (Collection<?>)row0;
+
+                    writer.writeInt(row.size());
+
+                    for (Object obj : row)
+                        SqlListenerUtils.writeObject(writer, obj, true);
+                }
+            }
+        }
+        else if (res0 instanceof OdbcQueryCloseResult) {
+            OdbcQueryCloseResult res = (OdbcQueryCloseResult) res0;
+
+            if (log.isDebugEnabled())
+                log.debug("Resulting query ID: " + res.getQueryId());
+
+            writer.writeLong(res.getQueryId());
+        }
+        else if (res0 instanceof OdbcQueryGetColumnsMetaResult) {
+            OdbcQueryGetColumnsMetaResult res = (OdbcQueryGetColumnsMetaResult) res0;
+
+            Collection<OdbcColumnMeta> columnsMeta = res.meta();
+
+            assert columnsMeta != null;
+
+            writer.writeInt(columnsMeta.size());
+
+            for (OdbcColumnMeta columnMeta : columnsMeta)
+                columnMeta.write(writer);
+        }
+        else if (res0 instanceof OdbcQueryGetTablesMetaResult) {
+            OdbcQueryGetTablesMetaResult res = (OdbcQueryGetTablesMetaResult) res0;
+
+            Collection<OdbcTableMeta> tablesMeta = res.meta();
+
+            assert tablesMeta != null;
+
+            writer.writeInt(tablesMeta.size());
+
+            for (OdbcTableMeta tableMeta : tablesMeta)
+                tableMeta.writeBinary(writer);
+        }
+        else if (res0 instanceof OdbcQueryGetParamsMetaResult) {
+            OdbcQueryGetParamsMetaResult res = (OdbcQueryGetParamsMetaResult) res0;
+
+            byte[] typeIds = res.typeIds();
+
+            SqlListenerUtils.writeObject(writer, typeIds, true);
+        }
+        else
+            assert false : "Should not reach here.";
+
+        return writer.array();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java
deleted file mode 100644
index 586fbc5..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectReader.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc.odbc;
-
-import org.apache.ignite.binary.BinaryObjectException;
-import org.apache.ignite.internal.binary.BinaryReaderExImpl;
-import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectReader;
-
-/**
- * Binary reader with marshaling non-primitive and non-embedded objects with JDK marshaller.
- */
-@SuppressWarnings("unchecked")
-public class OdbcObjectReader extends SqlListenerAbstractObjectReader {
-    /** {@inheritDoc} */
-    @Override protected Object readCustomObject(BinaryReaderExImpl reader) throws BinaryObjectException {
-        return reader.readObjectDetached();
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java
deleted file mode 100644
index c2f3aba..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcObjectWriter.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.odbc.odbc;
-
-import org.apache.ignite.binary.BinaryObjectException;
-import org.apache.ignite.internal.binary.BinaryWriterExImpl;
-import org.apache.ignite.internal.processors.odbc.SqlListenerAbstractObjectWriter;
-
-/**
- * Binary writer with marshaling non-primitive and non-embedded objects with JDK marshaller..
- */
-public class OdbcObjectWriter extends SqlListenerAbstractObjectWriter {
-    /** {@inheritDoc} */
-    @Override protected void writeCustomObject(BinaryWriterExImpl writer, Object obj) throws BinaryObjectException {
-        writer.writeObjectDetached(obj);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseRequest.java
new file mode 100644
index 0000000..a9decb7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * SQL listener query close request.
+ */
+public class OdbcQueryCloseRequest extends OdbcRequest {
+    /** Query ID. */
+    private final long queryId;
+
+    /**
+     * @param queryId Query ID.
+     */
+    public OdbcQueryCloseRequest(long queryId) {
+        super(QRY_CLOSE);
+
+        this.queryId = queryId;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long queryId() {
+        return queryId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(OdbcQueryCloseRequest.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseResult.java
new file mode 100644
index 0000000..eb156fa
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryCloseResult.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+/**
+ * SQL listener query close result.
+ */
+public class OdbcQueryCloseResult {
+    /** Query ID. */
+    private final long queryId;
+
+    /**
+     * @param queryId Query ID.
+     */
+    public OdbcQueryCloseResult(long queryId){
+        this.queryId = queryId;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long getQueryId() {
+        return queryId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java
new file mode 100644
index 0000000..dd674d2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteRequest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * SQL listener query execute request.
+ */
+public class OdbcQueryExecuteRequest extends OdbcRequest {
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Sql query. */
+    @GridToStringInclude(sensitive = true)
+    private final String sqlQry;
+
+    /** Sql query arguments. */
+    @GridToStringExclude
+    private final Object[] args;
+
+    /**
+     * @param cacheName Cache name.
+     * @param sqlQry SQL query.
+     * @param args Arguments list.
+     */
+    public OdbcQueryExecuteRequest(String cacheName, String sqlQry, Object[] args) {
+        super(QRY_EXEC);
+
+        this.cacheName = cacheName.isEmpty() ? null : cacheName;
+        this.sqlQry = sqlQry;
+        this.args = args;
+    }
+
+    /**
+     * @return Sql query.
+     */
+    public String sqlQuery() {
+        return sqlQry;
+    }
+
+    /**
+     * @return Sql query arguments.
+     */
+    public Object[] arguments() {
+        return args;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    @Nullable public String cacheName() {
+        return cacheName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(OdbcQueryExecuteRequest.class, this, "args", args, true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java
new file mode 100644
index 0000000..de5a8fd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryExecuteResult.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import java.util.Collection;
+
+/**
+ * SQL listener query execute result.
+ */
+public class OdbcQueryExecuteResult {
+    /** Query ID. */
+    private final long queryId;
+
+    /** Fields metadata. */
+    private final Collection<OdbcColumnMeta> columnsMeta;
+
+    /**
+     * @param queryId Query ID.
+     * @param columnsMeta Columns metadata.
+     */
+    public OdbcQueryExecuteResult(long queryId, Collection<OdbcColumnMeta> columnsMeta) {
+        this.queryId = queryId;
+        this.columnsMeta = columnsMeta;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long getQueryId() {
+        return queryId;
+    }
+
+    /**
+     * @return Columns metadata.
+     */
+    public Collection<OdbcColumnMeta> getColumnsMetadata() {
+        return columnsMeta;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchRequest.java
new file mode 100644
index 0000000..190fffd
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * SQL listener query fetch request.
+ */
+public class OdbcQueryFetchRequest extends OdbcRequest {
+    /** Query ID. */
+    private final long queryId;
+
+    /** Page size - maximum number of rows to return. */
+    private final int pageSize;
+
+    /**
+     * @param queryId Query ID.
+     * @param pageSize Page size.
+     */
+    public OdbcQueryFetchRequest(long queryId, int pageSize) {
+        super(QRY_FETCH);
+
+        this.queryId = queryId;
+        this.pageSize = pageSize;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long queryId() {
+        return queryId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(OdbcQueryFetchRequest.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchResult.java
new file mode 100644
index 0000000..f8075f3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryFetchResult.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import java.util.Collection;
+
+/**
+ * SQL listener query fetch result.
+ */
+public class OdbcQueryFetchResult {
+    /** Query ID. */
+    private final long queryId;
+
+    /** Query result rows. */
+    private final Collection<?> items;
+
+    /** Flag indicating the query has no unfetched results. */
+    private final boolean last;
+
+    /**
+     * @param queryId Query ID.
+     * @param items Query result rows.
+     * @param last Flag indicating the query has no unfetched results.
+     */
+    public OdbcQueryFetchResult(long queryId, Collection<?> items, boolean last){
+        this.queryId = queryId;
+        this.items = items;
+        this.last = last;
+    }
+
+    /**
+     * @return Query ID.
+     */
+    public long queryId() {
+        return queryId;
+    }
+
+    /**
+     * @return Query result rows.
+     */
+    public Collection<?> items() {
+        return items;
+    }
+
+    /**
+     * @return Flag indicating the query has no unfetched results.
+     */
+    public boolean last() {
+        return last;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaRequest.java
new file mode 100644
index 0000000..b60a9e3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaRequest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * ODBC query get columns meta request.
+ */
+public class OdbcQueryGetColumnsMetaRequest extends OdbcRequest {
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Table name. */
+    private final String tableName;
+
+    /** Column name. */
+    private final String columnName;
+
+    /**
+     * @param cacheName Cache name.
+     * @param tableName Table name.
+     * @param columnName Column name.
+     */
+    public OdbcQueryGetColumnsMetaRequest(String cacheName, String tableName, String columnName) {
+        super(META_COLS);
+
+        this.cacheName = cacheName;
+        this.tableName = tableName;
+        this.columnName = columnName;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    @Nullable public String cacheName() {
+        return cacheName;
+    }
+
+    /**
+     * @return Table name.
+     */
+    public String tableName() {
+        return tableName;
+    }
+
+    /**
+     * @return Column name.
+     */
+    public String columnName() {
+        return columnName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(OdbcQueryGetColumnsMetaRequest.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaResult.java
new file mode 100644
index 0000000..7dbf7d8
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetColumnsMetaResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import java.util.Collection;
+
+/**
+ * Query get columns meta result.
+ */
+public class OdbcQueryGetColumnsMetaResult {
+    /** Query result rows. */
+    private final Collection<OdbcColumnMeta> meta;
+
+    /**
+     * @param meta Column metadata.
+     */
+    public OdbcQueryGetColumnsMetaResult(Collection<OdbcColumnMeta> meta) {
+        this.meta = meta;
+    }
+
+    /**
+     * @return Query result rows.
+     */
+    public Collection<OdbcColumnMeta> meta() {
+        return meta;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaRequest.java
new file mode 100644
index 0000000..1d468b1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaRequest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * ODBC query get params meta request.
+ */
+public class OdbcQueryGetParamsMetaRequest extends OdbcRequest {
+    /** Cache. */
+    private final String cacheName;
+
+    /** Query. */
+    private final String query;
+
+    /**
+     * @param query SQL Query.
+     */
+    public OdbcQueryGetParamsMetaRequest(String cacheName, String query) {
+        super(META_PARAMS);
+
+        this.cacheName = cacheName;
+        this.query = query;
+    }
+
+    /**
+     * @return SQL Query.
+     */
+    public String query() {
+        return query;
+    }
+
+    /**
+     * @return Cache name.
+     */
+    public String cacheName() {
+        return cacheName;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(OdbcQueryGetParamsMetaRequest.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaResult.java
new file mode 100644
index 0000000..d29b760
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetParamsMetaResult.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+/**
+ * ODBC query get params meta result.
+ */
+public class OdbcQueryGetParamsMetaResult {
+    /** List of parameter type IDs. */
+    private final byte[] typeIds;
+
+    /**
+     * @param typeIds List of parameter type IDs.
+     */
+    public OdbcQueryGetParamsMetaResult(byte[] typeIds) {
+        this.typeIds = typeIds;
+    }
+
+    /**
+     * @return List of parameter type IDs.
+     */
+    public byte[] typeIds() {
+        return typeIds;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaRequest.java
new file mode 100644
index 0000000..0ebb462
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaRequest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * ODBC query get tables meta request.
+ */
+public class OdbcQueryGetTablesMetaRequest extends OdbcRequest {
+    /** Catalog search pattern. */
+    private final String catalog;
+
+    /** Schema search pattern. */
+    private final String schema;
+
+    /** Table search pattern. */
+    private final String table;
+
+    /** Table type search pattern. */
+    private final String tableType;
+
+    /**
+     * @param catalog Catalog search pattern.
+     * @param schema Schema search pattern.
+     * @param table Table search pattern.
+     * @param tableType Table type search pattern.
+     */
+    public OdbcQueryGetTablesMetaRequest(String catalog, String schema, String table, String tableType) {
+        super(META_TBLS);
+
+        this.catalog = catalog;
+        this.schema = schema;
+        this.table = table;
+        this.tableType = tableType;
+    }
+
+    /**
+     * @return catalog search pattern.
+     */
+    public String catalog() {
+        return catalog;
+    }
+
+    /**
+     * @return Schema search pattern.
+     */
+    public String schema() {
+        return schema;
+    }
+
+    /**
+     * @return Table search pattern.
+     */
+    public String table() {
+        return table;
+    }
+
+    /**
+     * @return Table type search pattern.
+     */
+    public String tableType() {
+        return tableType;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(OdbcQueryGetTablesMetaRequest.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaResult.java
new file mode 100644
index 0000000..6316b2d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcQueryGetTablesMetaResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import java.util.Collection;
+
+/**
+ * Query get columns meta result.
+ */
+public class OdbcQueryGetTablesMetaResult {
+    /** Query result rows. */
+    private final Collection<OdbcTableMeta> meta;
+
+    /**
+     * @param meta Column metadata.
+     */
+    public OdbcQueryGetTablesMetaResult(Collection<OdbcTableMeta> meta) {
+        this.meta = meta;
+    }
+
+    /**
+     * @return Query result rows.
+     */
+    public Collection<OdbcTableMeta> meta() {
+        return meta;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java
new file mode 100644
index 0000000..825e770
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import org.apache.ignite.internal.processors.odbc.SqlListenerRequest;
+
+/**
+ * SQL listener command request.
+ */
+public class OdbcRequest extends SqlListenerRequest {
+    /** Execute sql query. */
+    public static final int QRY_EXEC = 2;
+
+    /** Fetch query results. */
+    public static final int QRY_FETCH = 3;
+
+    /** Close query. */
+    public static final int QRY_CLOSE = 4;
+
+    /** Get columns meta query. */
+    public static final int META_COLS = 5;
+
+    /** Get columns meta query. */
+    public static final int META_TBLS = 6;
+
+    /** Get parameters meta. */
+    public static final int META_PARAMS = 7;
+
+    /** Get parameters meta. */
+    public static final int JDBC_REQ = 8;
+
+    /** Command. */
+    private final int cmd;
+
+    /**
+     * @param cmd Command type.
+     */
+    public OdbcRequest(int cmd) {
+        this.cmd = cmd;
+    }
+
+    /**
+     * @return Command.
+     */
+    public int command() {
+        return cmd;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e92707b6/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
new file mode 100644
index 0000000..e1b421b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/odbc/OdbcRequestHandler.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.odbc.odbc;
+
+import java.sql.ParameterMetaData;
+import java.sql.PreparedStatement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.odbc.OdbcUtils;
+import org.apache.ignite.internal.processors.odbc.SqlListenerRequest;
+import org.apache.ignite.internal.processors.odbc.SqlListenerRequestHandler;
+import org.apache.ignite.internal.processors.odbc.SqlListenerResponse;
+import org.apache.ignite.internal.processors.odbc.odbc.escape.OdbcEscapeUtils;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_COLS;
+import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_PARAMS;
+import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.META_TBLS;
+import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_CLOSE;
+import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_EXEC;
+import static org.apache.ignite.internal.processors.odbc.odbc.OdbcRequest.QRY_FETCH;
+
+/**
+ * SQL query handler.
+ */
+public class OdbcRequestHandler implements SqlListenerRequestHandler {
+    /** Query ID sequence. */
+    private static final AtomicLong QRY_ID_GEN = new AtomicLong();
+
+    /** Kernel context. */
+    private final GridKernalContext ctx;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** Busy lock. */
+    private final GridSpinBusyLock busyLock;
+
+    /** Maximum allowed cursors. */
+    private final int maxCursors;
+
+    /** Current queries cursors. */
+    private final ConcurrentHashMap<Long, IgniteBiTuple<QueryCursor, Iterator>> qryCursors = new ConcurrentHashMap<>();
+
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
+    /** Enforce join order flag. */
+    private final boolean enforceJoinOrder;
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Context.
+     * @param busyLock Shutdown latch.
+     * @param maxCursors Maximum allowed cursors.
+     * @param distributedJoins Distributed joins flag.
+     * @param enforceJoinOrder Enforce join order flag.
+     */
+    public OdbcRequestHandler(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors,
+        boolean distributedJoins, boolean enforceJoinOrder) {
+        this.ctx = ctx;
+        this.busyLock = busyLock;
+        this.maxCursors = maxCursors;
+        this.distributedJoins = distributedJoins;
+        this.enforceJoinOrder = enforceJoinOrder;
+
+        log = ctx.log(getClass());
+    }
+
+    /** {@inheritDoc} */
+    @Override public SqlListenerResponse handle(SqlListenerRequest req0) {
+        assert req0 != null;
+
+        OdbcRequest req = (OdbcRequest)req0;
+
+        if (!busyLock.enterBusy())
+            return new OdbcResponse(OdbcResponse.STATUS_FAILED,
+                    "Failed to handle ODBC request because node is stopping: " + req);
+
+        try {
+            switch (req.command()) {
+                case QRY_EXEC:
+                    return executeQuery((OdbcQueryExecuteRequest)req);
+
+                case QRY_FETCH:
+                    return fetchQuery((OdbcQueryFetchRequest)req);
+
+                case QRY_CLOSE:
+                    return closeQuery((OdbcQueryCloseRequest)req);
+
+                case META_COLS:
+                    return getColumnsMeta((OdbcQueryGetColumnsMetaRequest)req);
+
+                case META_TBLS:
+                    return getTablesMeta((OdbcQueryGetTablesMetaRequest)req);
+
+                case META_PARAMS:
+                    return getParamsMeta((OdbcQueryGetParamsMetaRequest)req);
+            }
+
+            return new OdbcResponse(OdbcResponse.STATUS_FAILED, "Unsupported ODBC request: " + req);
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public SqlListenerResponse handleException(Exception e) {
+        return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+    }
+
+    /**
+     * {@link OdbcQueryExecuteRequest} command handler.
+     *
+     * @param req Execute query request.
+     * @return Response.
+     */
+    private SqlListenerResponse executeQuery(OdbcQueryExecuteRequest req) {
+        int cursorCnt = qryCursors.size();
+
+        if (maxCursors > 0 && cursorCnt >= maxCursors)
+            return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, "Too many opened cursors (either close other " +
+                "opened cursors or increase the limit through OdbcConfiguration.setMaxOpenCursors()) " +
+                "[maximum=" + maxCursors + ", current=" + cursorCnt + ']');
+
+        long qryId = QRY_ID_GEN.getAndIncrement();
+
+        try {
+            String sql = OdbcEscapeUtils.parse(req.sqlQuery());
+
+            if (log.isDebugEnabled())
+                log.debug("ODBC query parsed [reqId=" + req.requestId() + ", original=" + req.sqlQuery() +
+                    ", parsed=" + sql + ']');
+
+            SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+            qry.setArgs(req.arguments());
+
+            qry.setDistributedJoins(distributedJoins);
+            qry.setEnforceJoinOrder(enforceJoinOrder);
+
+            IgniteCache<Object, Object> cache0 = ctx.grid().cache(req.cacheName());
+
+            if (cache0 == null)
+                return new OdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Cache doesn't exist (did you configure it?): " + req.cacheName());
+
+            IgniteCache<Object, Object> cache = cache0.withKeepBinary();
+
+            if (cache == null)
+                return new OdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Can not get cache with keep binary: " + req.cacheName());
+
+            QueryCursor qryCur = cache.query(qry);
+
+            qryCursors.put(qryId, new IgniteBiTuple<QueryCursor, Iterator>(qryCur, null));
+
+            List<?> fieldsMeta = ((QueryCursorImpl) qryCur).fieldsMeta();
+
+            OdbcQueryExecuteResult res = new OdbcQueryExecuteResult(qryId, convertMetadata(fieldsMeta));
+
+            return new OdbcResponse(res);
+        }
+        catch (Exception e) {
+            qryCursors.remove(qryId);
+
+            U.error(log, "Failed to execute SQL query [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link OdbcQueryCloseRequest} command handler.
+     *
+     * @param req Execute query request.
+     * @return Response.
+     */
+    private SqlListenerResponse closeQuery(OdbcQueryCloseRequest req) {
+        try {
+            IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId());
+
+            if (tuple == null)
+                return new OdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Failed to find query with ID: " + req.queryId());
+
+            QueryCursor cur = tuple.get1();
+
+            assert(cur != null);
+
+            cur.close();
+
+            qryCursors.remove(req.queryId());
+
+            OdbcQueryCloseResult res = new OdbcQueryCloseResult(req.queryId());
+
+            return new OdbcResponse(res);
+        }
+        catch (Exception e) {
+            qryCursors.remove(req.queryId());
+
+            U.error(log, "Failed to close SQL query [reqId=" + req.requestId() + ", req=" + req.queryId() + ']', e);
+
+            return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link OdbcQueryFetchRequest} command handler.
+     *
+     * @param req Execute query request.
+     * @return Response.
+     */
+    private SqlListenerResponse fetchQuery(OdbcQueryFetchRequest req) {
+        try {
+            IgniteBiTuple<QueryCursor, Iterator> tuple = qryCursors.get(req.queryId());
+
+            if (tuple == null)
+                return new OdbcResponse(SqlListenerResponse.STATUS_FAILED,
+                    "Failed to find query with ID: " + req.queryId());
+
+            Iterator iter = tuple.get2();
+
+            if (iter == null) {
+                QueryCursor cur = tuple.get1();
+
+                iter = cur.iterator();
+
+                tuple.put(cur, iter);
+            }
+
+            List<Object> items = new ArrayList<>();
+
+            for (int i = 0; i < req.pageSize() && iter.hasNext(); ++i)
+                items.add(iter.next());
+
+            OdbcQueryFetchResult res = new OdbcQueryFetchResult(req.queryId(), items, !iter.hasNext());
+
+            return new OdbcResponse(res);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to fetch SQL query result [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link OdbcQueryGetColumnsMetaRequest} command handler.
+     *
+     * @param req Get columns metadata request.
+     * @return Response.
+     */
+    private SqlListenerResponse getColumnsMeta(OdbcQueryGetColumnsMetaRequest req) {
+        try {
+            List<OdbcColumnMeta> meta = new ArrayList<>();
+
+            String cacheName;
+            String tableName;
+
+            if (req.tableName().contains(".")) {
+                // Parsing two-part table name.
+                String[] parts = req.tableName().split("\\.");
+
+                cacheName = OdbcUtils.removeQuotationMarksIfNeeded(parts[0]);
+
+                tableName = parts[1];
+            }
+            else {
+                cacheName = OdbcUtils.removeQuotationMarksIfNeeded(req.cacheName());
+
+                tableName = req.tableName();
+            }
+
+            Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName);
+
+            for (GridQueryTypeDescriptor table : tablesMeta) {
+                if (!matches(table.name(), tableName))
+                    continue;
+
+                for (Map.Entry<String, Class<?>> field : table.fields().entrySet()) {
+                    if (!matches(field.getKey(), req.columnName()))
+                        continue;
+
+                    OdbcColumnMeta columnMeta = new OdbcColumnMeta(req.cacheName(), table.name(),
+                        field.getKey(), field.getValue());
+
+                    if (!meta.contains(columnMeta))
+                        meta.add(columnMeta);
+                }
+            }
+
+            OdbcQueryGetColumnsMetaResult res = new OdbcQueryGetColumnsMetaResult(meta);
+
+            return new OdbcResponse(res);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to get columns metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link OdbcQueryGetTablesMetaRequest} command handler.
+     *
+     * @param req Get tables metadata request.
+     * @return Response.
+     */
+    private SqlListenerResponse getTablesMeta(OdbcQueryGetTablesMetaRequest req) {
+        try {
+            List<OdbcTableMeta> meta = new ArrayList<>();
+
+            String realSchema = OdbcUtils.removeQuotationMarksIfNeeded(req.schema());
+
+            for (String cacheName : ctx.cache().cacheNames())
+            {
+                if (!matches(cacheName, realSchema))
+                    continue;
+
+                Collection<GridQueryTypeDescriptor> tablesMeta = ctx.query().types(cacheName);
+
+                for (GridQueryTypeDescriptor table : tablesMeta) {
+                    if (!matches(table.name(), req.table()))
+                        continue;
+
+                    if (!matches("TABLE", req.tableType()))
+                        continue;
+
+                    OdbcTableMeta tableMeta = new OdbcTableMeta(null, cacheName, table.name(), "TABLE");
+
+                    if (!meta.contains(tableMeta))
+                        meta.add(tableMeta);
+                }
+            }
+
+            OdbcQueryGetTablesMetaResult res = new OdbcQueryGetTablesMetaResult(meta);
+
+            return new OdbcResponse(res);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to get tables metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * {@link OdbcQueryGetParamsMetaRequest} command handler.
+     *
+     * @param req Get params metadata request.
+     * @return Response.
+     */
+    private SqlListenerResponse getParamsMeta(OdbcQueryGetParamsMetaRequest req) {
+        try {
+            PreparedStatement stmt = ctx.query().prepareNativeStatement(req.cacheName(), req.query());
+
+            ParameterMetaData pmd = stmt.getParameterMetaData();
+
+            byte[] typeIds = new byte[pmd.getParameterCount()];
+
+            for (int i = 1; i <= pmd.getParameterCount(); ++i) {
+                int sqlType = pmd.getParameterType(i);
+
+                typeIds[i - 1] = sqlTypeToBinary(sqlType);
+            }
+
+            OdbcQueryGetParamsMetaResult res = new OdbcQueryGetParamsMetaResult(typeIds);
+
+            return new OdbcResponse(res);
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to get params metadata [reqId=" + req.requestId() + ", req=" + req + ']', e);
+
+            return new OdbcResponse(SqlListenerResponse.STATUS_FAILED, e.toString());
+        }
+    }
+
+    /**
+     * Convert {@link java.sql.Types} to binary type constant (See {@link GridBinaryMarshaller} constants).
+     *
+     * @param sqlType SQL type.
+     * @return Binary type.
+     */
+    private static byte sqlTypeToBinary(int sqlType) {
+        switch (sqlType) {
+            case Types.BIGINT:
+                return GridBinaryMarshaller.LONG;
+
+            case Types.BOOLEAN:
+                return GridBinaryMarshaller.BOOLEAN;
+
+            case Types.DATE:
+                return GridBinaryMarshaller.DATE;
+
+            case Types.DOUBLE:
+                return GridBinaryMarshaller.DOUBLE;
+
+            case Types.FLOAT:
+            case Types.REAL:
+                return GridBinaryMarshaller.FLOAT;
+
+            case Types.NUMERIC:
+            case Types.DECIMAL:
+                return GridBinaryMarshaller.DECIMAL;
+
+            case Types.INTEGER:
+                return GridBinaryMarshaller.INT;
+
+            case Types.SMALLINT:
+                return GridBinaryMarshaller.SHORT;
+
+            case Types.TIME:
+                return GridBinaryMarshaller.TIME;
+
+            case Types.TIMESTAMP:
+                return GridBinaryMarshaller.TIMESTAMP;
+
+            case Types.TINYINT:
+                return GridBinaryMarshaller.BYTE;
+
+            case Types.CHAR:
+            case Types.VARCHAR:
+            case Types.LONGNVARCHAR:
+                return GridBinaryMarshaller.STRING;
+
+            case Types.NULL:
+                return GridBinaryMarshaller.NULL;
+
+            case Types.BINARY:
+            case Types.VARBINARY:
+            case Types.LONGVARBINARY:
+            default:
+                return GridBinaryMarshaller.BYTE_ARR;
+        }
+    }
+
+    /**
+     * Convert metadata in collection from {@link GridQueryFieldMetadata} to
+     * {@link OdbcColumnMeta}.
+     *
+     * @param meta Internal query field metadata.
+     * @return Odbc query field metadata.
+     */
+    private static Collection<OdbcColumnMeta> convertMetadata(Collection<?> meta) {
+        List<OdbcColumnMeta> res = new ArrayList<>();
+
+        if (meta != null) {
+            for (Object info : meta) {
+                assert info instanceof GridQueryFieldMetadata;
+
+                res.add(new OdbcColumnMeta((GridQueryFieldMetadata)info));
+            }
+        }
+
+        return res;
+    }
+
+    /**
+     * Checks whether string matches SQL pattern.
+     *
+     * @param str String.
+     * @param ptrn Pattern.
+     * @return Whether string matches pattern.
+     */
+    private static boolean matches(String str, String ptrn) {
+        return str != null && (F.isEmpty(ptrn) ||
+            str.toUpperCase().matches(ptrn.toUpperCase().replace("%", ".*").replace("_", ".")));
+    }
+}


Mime
View raw message