ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [07/52] ignite git commit: IGNITE-2294: Implemented DML.
Date Mon, 05 Dec 2016 12:38:11 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
index e15e770..b80f573 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectExImpl.java
@@ -23,12 +23,13 @@ import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryArrayIdentityResolver;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryIdentityResolver;
 import org.apache.ignite.binary.BinaryType;
 import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
-import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
@@ -50,7 +51,7 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
     /**
      * @return {@code True} if object is array based.
      */
-    protected abstract boolean hasArray();
+    public abstract boolean hasArray();
 
     /**
      * @return Object array if object is array based, otherwise {@code null}.
@@ -81,22 +82,29 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
      *
      * @return Field value.
      */
-    @Nullable protected abstract int dataStartOffset();
+    public abstract int dataStartOffset();
 
     /**
      * Get offset of the footer begin.
      *
      * @return Field value.
      */
-    @Nullable protected abstract int footerStartOffset();
+    public abstract int footerStartOffset();
 
     /**
      * Get field by offset.
      *
-     * @param order Field order.
+     * @param order Field offset.
      * @return Field value.
      */
-    @Nullable protected abstract <F> F fieldByOrder(int order);
+    @Nullable public abstract <F> F fieldByOrder(int order);
+
+    /**
+     * Create field comparer.
+     *
+     * @return Comparer.
+     */
+    public abstract BinarySerializedFieldComparator createFieldComparator();
 
     /**
      * @param ctx Reader context.
@@ -106,18 +114,30 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
     @Nullable protected abstract <F> F field(BinaryReaderHandles ctx, String fieldName);
 
     /**
+     * @return {@code True} if object has schema.
+     */
+    public abstract boolean hasSchema();
+
+    /**
      * Get schema ID.
      *
      * @return Schema ID.
      */
-    protected abstract int schemaId();
+    public abstract int schemaId();
 
     /**
      * Create schema for object.
      *
      * @return Schema.
      */
-    protected abstract BinarySchema createSchema();
+    public abstract BinarySchema createSchema();
+
+    /**
+     * Get binary context.
+     *
+     * @return Binary context.
+     */
+    public abstract BinaryContext context();
 
     /** {@inheritDoc} */
     @Override public BinaryObjectBuilder toBuilder() throws BinaryObjectException {
@@ -134,58 +154,15 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
         if (other == this)
             return true;
 
-        if (other == null)
-            return false;
-
-        if (!(other instanceof BinaryObjectExImpl))
-            return false;
-
-        BinaryObjectExImpl other0 = (BinaryObjectExImpl)other;
-
-        if (typeId() != other0.typeId())
-            return false;
-
-        int start = dataStartOffset();
-        int end = footerStartOffset();
-
-        int otherStart = other0.dataStartOffset();
-        int otherEnd = other0.footerStartOffset();
-
-        int len = end - start;
-
-        if (len != otherEnd - otherStart)
+        if (!(other instanceof BinaryObject))
             return false;
 
-        if (hasArray()) {
-            byte[] arr = array();
+        BinaryIdentityResolver identity = context().identity(typeId());
 
-            if (other0.hasArray()) {
-                byte[] otherArr = other0.array();
+        if (identity == null)
+            identity = BinaryArrayIdentityResolver.instance();
 
-                for (int i = start, j = otherStart; i < end; i++, j++) {
-                    if (arr[i] != otherArr[j])
-                        return false;
-                }
-
-                return true;
-            }
-            else {
-                assert other0.offheapAddress() > 0;
-
-                return GridUnsafeMemory.compare(other0.offheapAddress() + otherStart, arr, start, len);
-            }
-        }
-        else {
-            assert offheapAddress() > 0;
-
-            if (other0.hasArray())
-                return GridUnsafeMemory.compare(offheapAddress() + start, other0.array(), otherStart, len);
-            else {
-                assert other0.offheapAddress() > 0;
-
-                return GridUnsafeMemory.compare(offheapAddress() + start, other0.offheapAddress() + otherStart, len);
-            }
-        }
+        return identity.equals(this, (BinaryObject)other);
     }
 
     /** {@inheritDoc} */
@@ -250,6 +227,7 @@ public abstract class BinaryObjectExImpl implements BinaryObjectEx {
      * @param ctx Reader context.
      * @param handles Handles for already traversed objects.
      */
+    @SuppressWarnings("unchecked")
     private void appendValue(Object val, SB buf, BinaryReaderHandles ctx,
         IdentityHashMap<BinaryObject, Integer> handles) {
         if (val instanceof byte[])

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
index 54a7c08..360c71a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectImpl.java
@@ -211,10 +211,8 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
         this.detachAllowed = detachAllowed;
     }
 
-    /**
-     * @return Context.
-     */
-    public BinaryContext context() {
+    /** {@inheritDoc} */
+    @Override public BinaryContext context() {
         return ctx;
     }
 
@@ -241,7 +239,7 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean hasArray() {
+    @Override public boolean hasArray() {
         return true;
     }
 
@@ -296,20 +294,34 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override protected int dataStartOffset() {
+    @Override public BinarySerializedFieldComparator createFieldComparator() {
+        int schemaOff = BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS);
+
+        short flags = BinaryPrimitives.readShort(arr, start + GridBinaryMarshaller.FLAGS_POS);
+
+        int fieldIdLen = BinaryUtils.isCompactFooter(flags) ? 0 : BinaryUtils.FIELD_ID_LEN;
+        int fieldOffLen = BinaryUtils.fieldOffsetLength(flags);
+
+        int orderBase = start + schemaOff + fieldIdLen;
+        int orderMultiplier = fieldIdLen + fieldOffLen;
+
+        return new BinarySerializedFieldComparator(this, arr, 0L, start, orderBase, orderMultiplier, fieldOffLen);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int dataStartOffset() {
         int typeId = BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.TYPE_ID_POS);
 
         if (typeId == GridBinaryMarshaller.UNREGISTERED_TYPE_ID) {
             int len = BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.DFLT_HDR_LEN + 1);
 
             return start + GridBinaryMarshaller.DFLT_HDR_LEN + len + 5;
-        }
-        else
+        } else
             return start + GridBinaryMarshaller.DFLT_HDR_LEN;
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override protected int footerStartOffset() {
+    @Override public int footerStartOffset() {
         short flags = BinaryPrimitives.readShort(arr, start + GridBinaryMarshaller.FLAGS_POS);
 
         if (!BinaryUtils.hasSchema(flags))
@@ -318,9 +330,12 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
         return start + BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS);
     }
 
-   /** {@inheritDoc} */
+    /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Nullable @Override protected <F> F fieldByOrder(int order) {
+    @Nullable @Override public <F> F fieldByOrder(int order) {
+        if (order == BinarySchema.ORDER_NOT_FOUND)
+            return null;
+
         Object val;
 
         // Calculate field position.
@@ -490,12 +505,19 @@ public final class BinaryObjectImpl extends BinaryObjectExImpl implements Extern
     }
 
     /** {@inheritDoc} */
-    @Override protected int schemaId() {
+    @Override public boolean hasSchema() {
+        short flags = BinaryPrimitives.readShort(arr, start + GridBinaryMarshaller.FLAGS_POS);
+
+        return BinaryUtils.hasSchema(flags);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int schemaId() {
         return BinaryPrimitives.readInt(arr, start + GridBinaryMarshaller.SCHEMA_ID_POS);
     }
 
     /** {@inheritDoc} */
-    @Override protected BinarySchema createSchema() {
+    @Override public BinarySchema createSchema() {
         return reader(null, false).getOrCreateSchema();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
index 7550b19..354ac11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryObjectOffheapImpl.java
@@ -96,7 +96,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
         if (typeId == GridBinaryMarshaller.UNREGISTERED_TYPE_ID) {
             int off = start + GridBinaryMarshaller.DFLT_HDR_LEN;
 
-            String clsName = BinaryUtils.doReadClassName(new BinaryOffheapInputStream(ptr + off, size));
+            String clsName = BinaryUtils.doReadClassName(new BinaryOffheapInputStream(off, size));
 
             typeId = ctx.typeId(clsName);
         }
@@ -115,16 +115,28 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
     }
 
     /** {@inheritDoc} */
-    @Override protected int schemaId() {
+    @Override public boolean hasSchema() {
+        short flags = BinaryPrimitives.readShort(ptr, start + GridBinaryMarshaller.FLAGS_POS);
+
+        return BinaryUtils.hasSchema(flags);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int schemaId() {
         return BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.SCHEMA_ID_POS);
     }
 
     /** {@inheritDoc} */
-    @Override protected BinarySchema createSchema() {
+    @Override public BinarySchema createSchema() {
         return reader(null, false).getOrCreateSchema();
     }
 
     /** {@inheritDoc} */
+    @Override public BinaryContext context() {
+        return ctx;
+    }
+
+    /** {@inheritDoc} */
     @Override public int start() {
         return start;
     }
@@ -140,7 +152,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
     }
 
     /** {@inheritDoc} */
-    @Override protected boolean hasArray() {
+    @Override public boolean hasArray() {
         return false;
     }
 
@@ -174,7 +186,22 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override protected int dataStartOffset() {
+    @Override public BinarySerializedFieldComparator createFieldComparator() {
+        int schemaOff = BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.SCHEMA_OR_RAW_OFF_POS);
+
+        short flags = BinaryPrimitives.readShort(ptr, start + GridBinaryMarshaller.FLAGS_POS);
+
+        int fieldIdLen = BinaryUtils.isCompactFooter(flags) ? 0 : BinaryUtils.FIELD_ID_LEN;
+        int fieldOffLen = BinaryUtils.fieldOffsetLength(flags);
+
+        int orderBase = start + schemaOff + fieldIdLen;
+        int orderMultiplier = fieldIdLen + fieldOffLen;
+
+        return new BinarySerializedFieldComparator(this, null, ptr, start, orderBase, orderMultiplier, fieldOffLen);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int dataStartOffset() {
         int typeId = BinaryPrimitives.readInt(ptr, start + GridBinaryMarshaller.TYPE_ID_POS);
 
         if (typeId == GridBinaryMarshaller.UNREGISTERED_TYPE_ID) {
@@ -186,7 +213,7 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override protected int footerStartOffset() {
+    @Override public int footerStartOffset() {
         short flags = BinaryPrimitives.readShort(ptr, start + GridBinaryMarshaller.FLAGS_POS);
 
         if (!BinaryUtils.hasSchema(flags))
@@ -197,7 +224,10 @@ public class BinaryObjectOffheapImpl extends BinaryObjectExImpl implements Exter
 
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
-    @Nullable @Override protected <F> F fieldByOrder(int order) {
+    @Nullable @Override public <F> F fieldByOrder(int order) {
+        if (order == BinarySchema.ORDER_NOT_FOUND)
+            return null;
+
         Object val;
 
         // Calculate field position.

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryPrimitives.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryPrimitives.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryPrimitives.java
index 8b82fad..86f5040 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryPrimitives.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryPrimitives.java
@@ -120,6 +120,18 @@ public abstract class BinaryPrimitives {
     }
 
     /**
+     * @param ptr Pointer.
+     * @param off Offset.
+     * @param val Value.
+     */
+    public static void writeShort(long ptr, int off, short val) {
+        if (BIG_ENDIAN)
+            GridUnsafe.putShortLE(ptr + off, val);
+        else
+            GridUnsafe.putShort(ptr + off, val);
+    }
+
+    /**
      * @param arr Array.
      * @param off Offset.
      * @return Value.
@@ -228,6 +240,18 @@ public abstract class BinaryPrimitives {
     }
 
     /**
+     * @param ptr Pointer.
+     * @param off Offset.
+     * @param val Value.
+     */
+    public static void writeInt(long ptr, int off, int val) {
+        if (BIG_ENDIAN)
+            GridUnsafe.putIntLE(ptr + off, val);
+        else
+            GridUnsafe.putInt(ptr + off, val);
+    }
+
+    /**
      * @param arr Array.
      * @param off Offset.
      * @return Value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySerializedFieldComparator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySerializedFieldComparator.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySerializedFieldComparator.java
new file mode 100644
index 0000000..130bb0c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinarySerializedFieldComparator.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary;
+
+import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
+import org.apache.ignite.internal.util.typedef.F;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Arrays;
+
+/**
+ * Compares fiels in serialized form when possible.
+ */
+public class BinarySerializedFieldComparator {
+    /** Position: not found. */
+    private static final int POS_NOT_FOUND = -1;
+
+    /** Original object. */
+    private final BinaryObjectExImpl obj;
+
+    /** Pointer to data (onheap). */
+    private final byte[] arr;
+
+    /** Pointer to data (offheap). */
+    private final long ptr;
+
+    /** Object start offset. */
+    private final int startOff;
+
+    /** Order base. */
+    private final int orderBase;
+
+    /** Order multiplier. */
+    private final int orderMultiplier;
+
+    /** Field offset length. */
+    private final int fieldOffLen;
+
+    /** Current field order. */
+    private int curFieldOrder;
+
+    /** Current field offset. */
+    private int curFieldPos;
+
+    /**
+     * Constructor.
+     *
+     * @param obj Original object.
+     * @param arr Array.
+     * @param ptr Pointer.
+     * @param startOff Start offset.
+     * @param orderBase Order base.
+     * @param orderMultiplier Order multiplier.
+     * @param fieldOffLen Field offset length.
+     */
+    public BinarySerializedFieldComparator(BinaryObjectExImpl obj, byte[] arr, long ptr, int startOff, int orderBase,
+        int orderMultiplier, int fieldOffLen) {
+        assert arr != null && ptr == 0L || arr == null && ptr != 0L;
+
+        this.obj = obj;
+        this.arr = arr;
+        this.ptr = ptr;
+        this.startOff = startOff;
+        this.orderBase = orderBase;
+        this.orderMultiplier = orderMultiplier;
+        this.fieldOffLen = fieldOffLen;
+    }
+
+    /**
+     * Locate the field.
+     *
+     * @param order Field order.
+     */
+    public void findField(int order) {
+        curFieldOrder = order;
+
+        if (order == BinarySchema.ORDER_NOT_FOUND)
+            curFieldPos = POS_NOT_FOUND;
+        else {
+            int pos = orderBase + order * orderMultiplier;
+
+            if (fieldOffLen == BinaryUtils.OFFSET_1) {
+                byte val = offheap() ? BinaryPrimitives.readByte(ptr, pos) : BinaryPrimitives.readByte(arr, pos);
+
+                curFieldPos = startOff + ((int)val & 0xFF);
+            }
+            else if (fieldOffLen == BinaryUtils.OFFSET_2) {
+                short val = offheap() ? BinaryPrimitives.readShort(ptr, pos) : BinaryPrimitives.readShort(arr, pos);
+
+                curFieldPos = startOff + ((int)val & 0xFFFF);
+            }
+            else {
+                int val = offheap() ? BinaryPrimitives.readInt(ptr, pos) : BinaryPrimitives.readInt(arr, pos);
+
+                curFieldPos = startOff + val;
+            }
+        }
+    }
+
+    /**
+     * Get field type.
+     *
+     * @return Field type.
+     */
+    private byte fieldType() {
+        if (curFieldPos == POS_NOT_FOUND)
+            return GridBinaryMarshaller.NULL;
+        else
+            return offheap() ?
+                BinaryPrimitives.readByte(ptr, curFieldPos) : BinaryPrimitives.readByte(arr, curFieldPos);
+    }
+
+    /**
+     * @return Whether this is offheap object.
+     */
+    private boolean offheap() {
+        return ptr != 0L;
+    }
+
+    /**
+     * Get current field.
+     *
+     * @return Current field.
+     */
+    private Object currentField() {
+        return obj.fieldByOrder(curFieldOrder);
+    }
+
+    /**
+     * Read byte value.
+     *
+     * @param off Offset.
+     * @return Value.
+     */
+    private byte readByte(int off) {
+        if (offheap())
+            return BinaryPrimitives.readByte(ptr, curFieldPos + off);
+        else
+            return arr[curFieldPos + off];
+    }
+
+    /**
+     * Read short value.
+     *
+     * @param off Offset.
+     * @return Value.
+     */
+    private short readShort(int off) {
+        if (offheap())
+            return BinaryPrimitives.readShort(ptr, curFieldPos + off);
+        else
+            return BinaryPrimitives.readShort(arr, curFieldPos + off);
+    }
+
+    /**
+     * Read int value.
+     *
+     * @param off Offset.
+     * @return Value.
+     */
+    private int readInt(int off) {
+        if (offheap())
+            return BinaryPrimitives.readInt(ptr, curFieldPos + off);
+        else
+            return BinaryPrimitives.readInt(arr, curFieldPos + off);
+    }
+
+    /**
+     * Read long value.
+     *
+     * @param off Offset.
+     * @return Value.
+     */
+    private long readLong(int off) {
+        if (offheap())
+            return BinaryPrimitives.readLong(ptr, curFieldPos + off);
+        else
+            return BinaryPrimitives.readLong(arr, curFieldPos + off);
+    }
+
+    /**
+     * Compare fields.
+     *
+     * @param c1 First comparer.
+     * @param c2 Second comparer.
+     * @return {@code True} if both fields are equal.
+     */
+    public static boolean equals(BinarySerializedFieldComparator c1, BinarySerializedFieldComparator c2) {
+        // Compare field types.
+        byte typ = c1.fieldType();
+
+        if (typ != c2.fieldType())
+            return false;
+
+        // Switch by type and compare.
+        switch (typ) {
+            case GridBinaryMarshaller.BYTE:
+            case GridBinaryMarshaller.BOOLEAN:
+                return c1.readByte(1) == c2.readByte(1);
+
+            case GridBinaryMarshaller.SHORT:
+            case GridBinaryMarshaller.CHAR:
+                return c1.readShort(1) == c2.readShort(1);
+
+            case GridBinaryMarshaller.INT:
+            case GridBinaryMarshaller.FLOAT:
+                return c1.readInt(1) == c2.readInt(1);
+
+            case GridBinaryMarshaller.LONG:
+            case GridBinaryMarshaller.DOUBLE:
+            case GridBinaryMarshaller.DATE:
+                return c1.readLong(1) == c2.readLong(1);
+
+            case GridBinaryMarshaller.TIMESTAMP:
+                return c1.readLong(1) == c2.readLong(1) && c1.readInt(1 + 8) == c2.readInt(1 + 8);
+
+            case GridBinaryMarshaller.UUID:
+                return c1.readLong(1) == c2.readLong(1) && c1.readLong(1 + 8) == c2.readLong(1 + 8);
+
+            case GridBinaryMarshaller.STRING:
+                return compareByteArrays(c1, c2, 1);
+
+            case GridBinaryMarshaller.DECIMAL:
+                return c1.readInt(1) == c2.readInt(1) && compareByteArrays(c1, c2, 5);
+
+            case GridBinaryMarshaller.NULL:
+                return true;
+
+            default:
+                Object val1 = c1.currentField();
+                Object val2 = c2.currentField();
+
+                return isArray(val1) ? compareArrays(val1, val2) : F.eq(val1, val2);
+        }
+    }
+
+    /**
+     * Compare arrays.
+     *
+     * @param val1 Value 1.
+     * @param val2 Value 2.
+     * @return Result.
+     */
+    private static boolean compareArrays(Object val1, Object val2) {
+        if (val1.getClass() == val2.getClass()) {
+            if (val1 instanceof byte[])
+                return Arrays.equals((byte[])val1, (byte[])val2);
+            else if (val1 instanceof boolean[])
+                return Arrays.equals((boolean[])val1, (boolean[])val2);
+            else if (val1 instanceof short[])
+                return Arrays.equals((short[])val1, (short[])val2);
+            else if (val1 instanceof char[])
+                return Arrays.equals((char[])val1, (char[])val2);
+            else if (val1 instanceof int[])
+                return Arrays.equals((int[])val1, (int[])val2);
+            else if (val1 instanceof long[])
+                return Arrays.equals((long[])val1, (long[])val2);
+            else if (val1 instanceof float[])
+                return Arrays.equals((float[])val1, (float[])val2);
+            else if (val1 instanceof double[])
+                return Arrays.equals((double[])val1, (double[])val2);
+            else
+                return Arrays.deepEquals((Object[])val1, (Object[])val2);
+        }
+
+        return false;
+    }
+
+    /**
+     * @param field Field.
+     * @return {@code True} if field is array.
+     */
+    private static boolean isArray(@Nullable Object field) {
+        return field != null && field.getClass().isArray();
+    }
+
+    /**
+     * Compare byte arrays.
+     *
+     * @param c1 Comparer 1.
+     * @param c2 Comparer 2.
+     * @param off Offset (where length is located).
+     * @return {@code True} if equal.
+     */
+    private static boolean compareByteArrays(BinarySerializedFieldComparator c1, BinarySerializedFieldComparator c2,
+                                             int off) {
+        int len = c1.readInt(off);
+
+        if (len != c2.readInt(off))
+            return false;
+        else {
+            off += 4;
+
+            if (c1.offheap()) {
+                if (c2.offheap())
+                    // Case 1: both offheap.
+                    return GridUnsafeMemory.compare(c1.curFieldPos + c1.ptr + off, c2.curFieldPos + c2.ptr + off, len);
+            }
+            else {
+                if (!c2.offheap()) {
+                    // Case 2: both onheap.
+                    for (int i = 0; i < len; i++) {
+                        if (c1.arr[c1.curFieldPos + off + i] != c2.arr[c2.curFieldPos + off + i])
+                            return false;
+                    }
+
+                    return true;
+                }
+                else {
+                    // Swap.
+                    BinarySerializedFieldComparator tmp = c1;
+                    c1 = c2;
+                    c2 = tmp;
+                }
+            }
+
+            // Case 3: offheap vs onheap.
+            assert c1.offheap() && !c2.offheap();
+
+            for (int i = 0; i < len; i++) {
+                if (BinaryPrimitives.readByte(c1.ptr, c1.curFieldPos + off + i) != c2.arr[c2.curFieldPos + off + i])
+                    return false;
+            }
+
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index b304082..cb6e641 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -312,7 +312,7 @@ public class BinaryUtils {
      * @param flag Flag.
      * @return {@code True} if flag is set in flags.
      */
-    static boolean isFlagSet(short flags, short flag) {
+    public static boolean isFlagSet(short flags, short flag) {
         return (flags & flag) == flag;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
index 1de0a65..adaacdd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryWriterExImpl.java
@@ -29,6 +29,7 @@ import java.util.Date;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryIdentityResolver;
 import org.apache.ignite.binary.BinaryObjectException;
 import org.apache.ignite.binary.BinaryRawWriter;
 import org.apache.ignite.binary.BinaryWriter;
@@ -326,6 +327,48 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
     }
 
     /**
+     * Perform post-write hash code update if necessary.
+     *
+     * @param clsName Class name. Always null if class is registered.
+     */
+    public void postWriteHashCode(@Nullable String clsName) {
+        int typeId = clsName == null ? this.typeId : ctx.typeId(clsName);
+
+        BinaryIdentityResolver identity = ctx.identity(typeId);
+
+        if (identity != null) {
+            if (out.hasArray()) {
+                // Heap.
+                byte[] data = out.array();
+
+                BinaryObjectImpl obj = new BinaryObjectImpl(ctx, data, start);
+
+                short flags = BinaryPrimitives.readShort(data, start + GridBinaryMarshaller.FLAGS_POS);
+
+                BinaryPrimitives.writeShort(data, start + GridBinaryMarshaller.FLAGS_POS,
+                    (short) (flags & ~BinaryUtils.FLAG_EMPTY_HASH_CODE));
+
+                BinaryPrimitives.writeInt(data, start + GridBinaryMarshaller.HASH_CODE_POS, identity.hashCode(obj));
+            }
+            else {
+                // Offheap.
+                long ptr = out.rawOffheapPointer();
+
+                assert ptr != 0;
+
+                BinaryObjectOffheapImpl obj = new BinaryObjectOffheapImpl(ctx, ptr, start, out.capacity());
+
+                short flags = BinaryPrimitives.readShort(ptr, start + GridBinaryMarshaller.FLAGS_POS);
+
+                BinaryPrimitives.writeShort(ptr, start + GridBinaryMarshaller.FLAGS_POS,
+                    (short) (flags & ~BinaryUtils.FLAG_EMPTY_HASH_CODE));
+
+                BinaryPrimitives.writeInt(ptr, start + GridBinaryMarshaller.HASH_CODE_POS, identity.hashCode(obj));
+            }
+        }
+    }
+
+    /**
      * Pop schema.
      */
     public void popSchema() {
@@ -337,8 +380,6 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
      * @param val Byte array.
      */
     public void write(byte[] val) {
-        assert val != null;
-
         out.writeByteArray(val);
     }
 
@@ -348,8 +389,6 @@ public class BinaryWriterExImpl implements BinaryWriter, BinaryRawWriterEx, Obje
      * @param len Length.
      */
     public void write(byte[] val, int off, int len) {
-        assert val != null;
-
         out.write(val, off, len);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
index f0bc874..ddd2423 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/builder/BinaryObjectBuilderImpl.java
@@ -130,6 +130,8 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
      * @param start Start.
      */
     BinaryObjectBuilderImpl(BinaryBuilderReader reader, int start) {
+        assert reader != null;
+
         this.reader = reader;
         this.start = start;
         this.flags = reader.readShortPositioned(start + GridBinaryMarshaller.FLAGS_POS);
@@ -193,6 +195,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
      * @param writer Writer.
      * @param serializer Serializer.
      */
+    @SuppressWarnings("ResultOfMethodCallIgnored")
     void serializeTo(BinaryWriterExImpl writer, BinaryBuilderSerializer serializer) {
         try {
             writer.preWrite(registeredType ? null : clsNameToWrite);
@@ -357,6 +360,9 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
 
                 schemaReg.addSchema(curSchema.schemaId(), curSchema);
             }
+
+            // Update hash code after schema is written.
+            writer.postWriteHashCode(registeredType ? null : clsNameToWrite);
         }
         finally {
             writer.popSchema();

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
index b6c30bb..46aa03d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractInputStream.java
@@ -289,6 +289,11 @@ public abstract class BinaryAbstractInputStream extends BinaryAbstractStream
         return 0;
     }
 
+    /** {@inheritDoc} */
+    @Override public long rawOffheapPointer() {
+        return 0;
+    }
+
     /**
      * Ensure that there is enough data.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
index b9df68e..769031f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryAbstractOutputStream.java
@@ -244,6 +244,11 @@ public abstract class BinaryAbstractOutputStream extends BinaryAbstractStream
     }
 
     /** {@inheritDoc} */
+    @Override public long rawOffheapPointer() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
     @Override public void unsafeEnsure(int cap) {
         ensureCapacity(pos + cap);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
index b584373..b5edc02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapInputStream.java
@@ -92,6 +92,11 @@ public final class BinaryHeapInputStream extends BinaryAbstractInputStream {
     }
 
     /** {@inheritDoc} */
+    @Override public int capacity() {
+        return data.length;
+    }
+
+    /** {@inheritDoc} */
     @Override public byte[] array() {
         return data;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
index 2c31641..f06c980 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryHeapOutputStream.java
@@ -209,4 +209,9 @@ public final class BinaryHeapOutputStream extends BinaryAbstractOutputStream {
 
         shift(8);
     }
+
+    /** {@inheritDoc} */
+    @Override public int capacity() {
+        return data.length;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
index 9230846..9dc92c6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapInputStream.java
@@ -66,6 +66,11 @@ public class BinaryOffheapInputStream extends BinaryAbstractInputStream {
     }
 
     /** {@inheritDoc} */
+    @Override public int capacity() {
+        return cap;
+    }
+
+    /** {@inheritDoc} */
     @Override public byte[] array() {
         return arrayCopy();
     }
@@ -147,4 +152,9 @@ public class BinaryOffheapInputStream extends BinaryAbstractInputStream {
     @Override public long offheapPointer() {
         return forceHeap ? 0 : ptr;
     }
+
+    /** {@inheritDoc} */
+    @Override public long rawOffheapPointer() {
+        return ptr;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
index 1cb9f4f..be9f7d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryOffheapOutputStream.java
@@ -89,9 +89,7 @@ public class BinaryOffheapOutputStream extends BinaryAbstractOutputStream {
         return ptr;
     }
 
-    /**
-     * @return Capacity.
-     */
+    /** {@inheritDoc} */
     public int capacity() {
         return cap;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
index b868199..5bdd644 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/streams/BinaryStream.java
@@ -42,12 +42,22 @@ public interface BinaryStream {
     public byte[] arrayCopy();
 
     /**
-     * @return Offheap pointer if stream is offheap based, otherwise {@code 0}.
+     * @return Offheap pointer if stream is offheap based and "forceHeap" flag is not set; otherwise {@code 0}.
      */
     public long offheapPointer();
 
     /**
+     * @return Offheap pointer if stream is offheap based; otherwise {@code 0}.
+     */
+    public long rawOffheapPointer();
+
+    /**
      * @return {@code True} is stream is array based.
      */
     public boolean hasArray();
+
+    /**
+     * @return Total capacity.
+     */
+    public int capacity();
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
index 6f0d9c4..5c4a147 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcConnection.java
@@ -55,6 +55,7 @@ import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.compute.ComputeTaskTimeoutException;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgnitionEx;
+import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
 import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.F;
@@ -697,6 +698,13 @@ public class JdbcConnection implements Connection {
     }
 
     /**
+     * @return {@code true} if target node has DML support, {@code false} otherwise.
+     */
+    boolean isDmlSupported() {
+        return ignite.version().greaterThanEqual(1, 8, 0);
+    }
+
+    /**
      * @return Local query flag.
      */
     boolean isLocalQuery() {
@@ -736,6 +744,15 @@ public class JdbcConnection implements Connection {
     }
 
     /**
+     * @param sql Query.
+     * @return {@link PreparedStatement} from underlying engine to supply metadata to Prepared - most likely H2.
+     */
+    PreparedStatement prepareNativeStatement(String sql) throws SQLException {
+        return ((IgniteCacheProxy) ignite().cache(cacheName())).context()
+            .kernalContext().query().prepareNativeStatement(cacheName(), sql);
+    }
+
+    /**
      * JDBC connection validation task.
      */
     private static class JdbcConnectionValidationTask implements IgniteCallable<Boolean> {

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
index a99f24c..57badd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcPreparedStatement.java
@@ -31,8 +31,10 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     /** SQL query. */
     private final String sql;
 
-    /** Arguments count. */
-    private final int argsCnt;
+    /**
+     * H2's parsed statement to retrieve metadata from.
+     */
+    private PreparedStatement nativeStatement;
 
     /**
      * Creates new prepared statement.
@@ -44,12 +46,21 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
         super(conn);
 
         this.sql = sql;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addBatch(String sql) throws SQLException {
+        ensureNotClosed();
 
-        argsCnt = sql.replaceAll("[^?]", "").length();
+        throw new SQLFeatureNotSupportedException("Adding new SQL command to batch not supported for prepared statement.");
     }
 
+
+
     /** {@inheritDoc} */
     @Override public ResultSet executeQuery() throws SQLException {
+        ensureNotClosed();
+
         ResultSet rs = executeQuery(sql);
 
         args = null;
@@ -61,7 +72,11 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     @Override public int executeUpdate() throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        int res = executeUpdate(sql);
+
+        args = null;
+
+        return res;
     }
 
     /** {@inheritDoc} */
@@ -163,6 +178,13 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     }
 
     /** {@inheritDoc} */
+    @Override public void clearBatch() throws SQLException {
+        ensureNotClosed();
+
+        throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+    }
+
+    /** {@inheritDoc} */
     @Override public void setObject(int paramIdx, Object x, int targetSqlType) throws SQLException {
         setArgument(paramIdx, x);
     }
@@ -181,7 +203,12 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     @Override public void addBatch() throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
+    }
+
+    /** {@inheritDoc} */
+    @Override public int[] executeBatch() throws SQLException {
+        throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
     }
 
     /** {@inheritDoc} */
@@ -223,7 +250,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     @Override public ResultSetMetaData getMetaData() throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Meta data for prepared statement is not supported.");
+        return getNativeStatement().getMetaData();
     }
 
     /** {@inheritDoc} */
@@ -255,7 +282,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     @Override public ParameterMetaData getParameterMetaData() throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Meta data for prepared statement is not supported.");
+        return getNativeStatement().getParameterMetaData();
     }
 
     /** {@inheritDoc} */
@@ -400,12 +427,36 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
     private void setArgument(int paramIdx, Object val) throws SQLException {
         ensureNotClosed();
 
-        if (paramIdx < 1 || paramIdx > argsCnt)
+        if (paramIdx < 1)
             throw new SQLException("Parameter index is invalid: " + paramIdx);
 
+        ensureArgsSize(paramIdx);
+
+        args.set(paramIdx - 1, val);
+    }
+
+    /**
+     * Initialize {@link #args} and increase its capacity and size up to given argument if needed.
+     * @param size new expected size.
+     */
+    private void ensureArgsSize(int size) {
         if (args == null)
-            args = new Object[argsCnt];
+            args = new ArrayList<>(size);
+
+        args.ensureCapacity(size);
+
+        while (args.size() < size)
+            args.add(null);
+    }
+
+    /**
+     * @return H2's prepared statement to get metadata from.
+     * @throws SQLException if failed.
+     */
+    private PreparedStatement getNativeStatement() throws SQLException {
+        if (nativeStatement != null)
+            return nativeStatement;
 
-        args[paramIdx - 1] = val;
+        return (nativeStatement = conn.prepareNativeStatement(sql));
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
index c4911cb..0b23f9b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTask.java
@@ -49,7 +49,10 @@ import org.apache.ignite.resources.IgniteInstanceResource;
  * Not closed cursors will be removed after {@link #RMV_DELAY} milliseconds.
  * This parameter can be configured via {@link IgniteSystemProperties#IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY}
  * system property.
+ *
+ * Deprecated due to introduction of DML features - see {@link JdbcQueryTaskV2}.
  */
+@Deprecated
 class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
     /** Serial version uid. */
     private static final long serialVersionUID = 0L;
@@ -178,7 +181,7 @@ class JdbcQueryTask implements IgniteCallable<JdbcQueryTask.QueryResult> {
             List<Object> row0 = new ArrayList<>(row.size());
 
             for (Object val : row)
-                row0.add(JdbcUtils.sqlType(val) ? val : val.toString());
+                row0.add(val == null || JdbcUtils.isSqlType(val.getClass()) ? val : val.toString());
 
             rows.add(row0);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV2.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV2.java
new file mode 100644
index 0000000..9093d15
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcQueryTaskV2.java
@@ -0,0 +1,406 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import java.io.Serializable;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteJdbcDriver;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.util.typedef.CAX;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+
+/**
+ * Task for SQL queries execution through {@link IgniteJdbcDriver}.
+ * <p>
+ * Not closed cursors will be removed after {@link #RMV_DELAY} milliseconds.
+ * This parameter can be configured via {@link IgniteSystemProperties#IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY}
+ * system property.
+ */
+class JdbcQueryTaskV2 implements IgniteCallable<JdbcQueryTaskV2.QueryResult> {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /** How long to store open cursor. */
+    private static final long RMV_DELAY = IgniteSystemProperties.getLong(
+        IgniteSystemProperties.IGNITE_JDBC_DRIVER_CURSOR_REMOVE_DELAY, 600000);
+
+    /** Scheduler. */
+    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1);
+
+    /** Open cursors. */
+    private static final ConcurrentMap<UUID, Cursor> CURSORS = new ConcurrentHashMap<>();
+
+    /** Ignite. */
+    @IgniteInstanceResource
+    private Ignite ignite;
+
+    /** Uuid. */
+    private final UUID uuid;
+
+    /** Cache name. */
+    private final String cacheName;
+
+    /** Sql. */
+    private final String sql;
+
+    /** Operation type flag - query or not. */
+    private Boolean isQry;
+
+    /** Args. */
+    private final Object[] args;
+
+    /** Fetch size. */
+    private final int fetchSize;
+
+    /** Local execution flag. */
+    private final boolean loc;
+
+    /** Local query flag. */
+    private final boolean locQry;
+
+    /** Collocated query flag. */
+    private final boolean collocatedQry;
+
+    /** Distributed joins flag. */
+    private final boolean distributedJoins;
+
+    /**
+     * @param ignite Ignite.
+     * @param cacheName Cache name.
+     * @param sql Sql query.
+     * @param isQry Operation type flag - query or not - to enforce query type check.
+     * @param loc Local execution flag.
+     * @param args Args.
+     * @param fetchSize Fetch size.
+     * @param uuid UUID.
+     * @param locQry Local query flag.
+     * @param collocatedQry Collocated query flag.
+     * @param distributedJoins Distributed joins flag.
+     */
+    public JdbcQueryTaskV2(Ignite ignite, String cacheName, String sql,
+                           Boolean isQry, boolean loc, Object[] args, int fetchSize, UUID uuid,
+                           boolean locQry, boolean collocatedQry, boolean distributedJoins) {
+        this.ignite = ignite;
+        this.args = args;
+        this.uuid = uuid;
+        this.cacheName = cacheName;
+        this.sql = sql;
+        this.isQry = isQry;
+        this.fetchSize = fetchSize;
+        this.loc = loc;
+        this.locQry = locQry;
+        this.collocatedQry = collocatedQry;
+        this.distributedJoins = distributedJoins;
+    }
+
+    /** {@inheritDoc} */
+    @Override public JdbcQueryTaskV2.QueryResult call() throws Exception {
+        Cursor cursor = CURSORS.get(uuid);
+
+        List<String> tbls = null;
+        List<String> cols = null;
+        List<String> types = null;
+
+        boolean first;
+
+        if (first = (cursor == null)) {
+            IgniteCache<?, ?> cache = ignite.cache(cacheName);
+
+            // Don't create caches on server nodes in order to avoid of data rebalancing.
+            boolean start = ignite.configuration().isClientMode();
+
+            if (cache == null && cacheName == null)
+                cache = ((IgniteKernal)ignite).context().cache().getOrStartPublicCache(start, !loc && locQry);
+
+            if (cache == null) {
+                if (cacheName == null)
+                    throw new SQLException("Failed to execute query. No suitable caches found.");
+                else
+                    throw new SQLException("Cache not found [cacheName=" + cacheName + ']');
+            }
+
+            SqlFieldsQuery qry = (isQry != null ? new JdbcSqlFieldsQuery(sql, isQry) : new SqlFieldsQuery(sql))
+                .setArgs(args);
+
+            qry.setPageSize(fetchSize);
+            qry.setLocal(locQry);
+            qry.setCollocated(collocatedQry);
+            qry.setDistributedJoins(distributedJoins);
+
+            QueryCursorImpl<List<?>> qryCursor = (QueryCursorImpl<List<?>>)cache.query(qry);
+
+            if (isQry == null)
+                isQry = qryCursor.isQuery();
+
+            Collection<GridQueryFieldMetadata> meta = qryCursor.fieldsMeta();
+
+            tbls = new ArrayList<>(meta.size());
+            cols = new ArrayList<>(meta.size());
+            types = new ArrayList<>(meta.size());
+
+            for (GridQueryFieldMetadata desc : meta) {
+                tbls.add(desc.typeName());
+                cols.add(desc.fieldName().toUpperCase());
+                types.add(desc.fieldTypeName());
+            }
+
+            CURSORS.put(uuid, cursor = new Cursor(qryCursor, qryCursor.iterator()));
+        }
+
+        List<List<?>> rows = new ArrayList<>();
+
+        for (List<?> row : cursor) {
+            List<Object> row0 = new ArrayList<>(row.size());
+
+            for (Object val : row)
+                row0.add(val == null || JdbcUtils.isSqlType(val.getClass()) ? val : val.toString());
+
+            rows.add(row0);
+
+            if (rows.size() == fetchSize) // If fetchSize is 0 then unlimited
+                break;
+        }
+
+        boolean finished = !cursor.hasNext();
+
+        if (finished)
+            remove(uuid, cursor);
+        else if (first) {
+            if (!loc)
+                scheduleRemoval(uuid, RMV_DELAY);
+        }
+        else if (!loc && !CURSORS.replace(uuid, cursor, new Cursor(cursor.cursor, cursor.iter)))
+            assert !CURSORS.containsKey(uuid) : "Concurrent cursor modification.";
+
+        assert isQry != null : "Query flag must be set prior to returning result";
+
+        return new QueryResult(uuid, finished, isQry, rows, cols, tbls, types);
+    }
+
+    /**
+     * Schedules removal of stored cursor in case of remote query execution.
+     *
+     * @param uuid Cursor UUID.
+     * @param delay Delay in milliseconds.
+     */
+    private void scheduleRemoval(final UUID uuid, long delay) {
+        assert !loc;
+
+        SCHEDULER.schedule(new CAX() {
+            @Override public void applyx() {
+                while (true) {
+                    Cursor c = CURSORS.get(uuid);
+
+                    if (c == null)
+                        break;
+
+                    // If the cursor was accessed since last scheduling then reschedule.
+                    long untouchedTime = U.currentTimeMillis() - c.lastAccessTime;
+
+                    if (untouchedTime < RMV_DELAY) {
+                        scheduleRemoval(uuid, RMV_DELAY - untouchedTime);
+
+                        break;
+                    }
+                    else if (remove(uuid, c))
+                        break;
+                }
+            }
+        }, delay, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * @param uuid Cursor UUID.
+     * @param c Cursor.
+     * @return {@code true} If succeeded.
+     */
+    private static boolean remove(UUID uuid, Cursor c) {
+        boolean rmv = CURSORS.remove(uuid, c);
+
+        if (rmv)
+            c.cursor.close();
+
+        return rmv;
+    }
+
+    /**
+     * Closes and removes cursor.
+     *
+     * @param uuid Cursor UUID.
+     */
+    static void remove(UUID uuid) {
+        Cursor c = CURSORS.remove(uuid);
+
+        if (c != null)
+            c.cursor.close();
+    }
+
+
+    /**
+     * Result of query execution.
+     */
+    static class QueryResult implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Uuid. */
+        private final UUID uuid;
+
+        /** Finished. */
+        private final boolean finished;
+
+        /** Result type - query or update. */
+        private final boolean isQry;
+
+        /** Rows. */
+        private final List<List<?>> rows;
+
+        /** Tables. */
+        private final List<String> tbls;
+
+        /** Columns. */
+        private final List<String> cols;
+
+        /** Types. */
+        private final List<String> types;
+
+        /**
+         * @param uuid UUID..
+         * @param finished Finished.
+         * @param isQry
+         * @param rows Rows.
+         * @param cols Columns.
+         * @param tbls Tables.
+         * @param types Types.
+         */
+        public QueryResult(UUID uuid, boolean finished, boolean isQry, List<List<?>> rows, List<String> cols,
+            List<String> tbls, List<String> types) {
+            this.isQry = isQry;
+            this.cols = cols;
+            this.uuid = uuid;
+            this.finished = finished;
+            this.rows = rows;
+            this.tbls = tbls;
+            this.types = types;
+        }
+
+        /**
+         * @return Query result rows.
+         */
+        public List<List<?>> getRows() {
+            return rows;
+        }
+
+        /**
+         * @return Tables metadata.
+         */
+        public List<String> getTbls() {
+            return tbls;
+        }
+
+        /**
+         * @return Columns metadata.
+         */
+        public List<String> getCols() {
+            return cols;
+        }
+
+        /**
+         * @return Types metadata.
+         */
+        public List<String> getTypes() {
+            return types;
+        }
+
+        /**
+         * @return Query UUID.
+         */
+        public UUID getUuid() {
+            return uuid;
+        }
+
+        /**
+         * @return {@code True} if it is finished query.
+         */
+        public boolean isFinished() {
+            return finished;
+        }
+
+        /**
+         * @return {@code true} if it is result of a query operation, not update; {@code false} otherwise.
+         */
+        public boolean isQuery() {
+            return isQry;
+        }
+    }
+
+    /**
+     * Cursor.
+     */
+    private static final class Cursor implements Iterable<List<?>> {
+        /** Cursor. */
+        final QueryCursor<List<?>> cursor;
+
+        /** Iterator. */
+        final Iterator<List<?>> iter;
+
+        /** Last access time. */
+        final long lastAccessTime;
+
+        /**
+         * @param cursor Cursor.
+         * @param iter Iterator.
+         */
+        private Cursor(QueryCursor<List<?>> cursor, Iterator<List<?>> iter) {
+            this.cursor = cursor;
+            this.iter = iter;
+            this.lastAccessTime = U.currentTimeMillis();
+        }
+
+        /** {@inheritDoc} */
+        @Override public Iterator<List<?>> iterator() {
+            return iter;
+        }
+
+        /**
+         * @return {@code True} if cursor has next element.
+         */
+        public boolean hasNext() {
+            return iter.hasNext();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
index 1bf5223..c1a5f4c 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcResultSet.java
@@ -45,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -146,6 +147,29 @@ public class JdbcResultSet implements ResultSet {
 
             boolean loc = nodeId == null;
 
+            if (conn.isDmlSupported()) {
+                // Connections from new clients send queries with new tasks, so we have to continue in the same manner
+                JdbcQueryTaskV2 qryTask = new JdbcQueryTaskV2(loc ? ignite : null, conn.cacheName(), null, true, loc, null,
+                    fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
+
+                try {
+                    JdbcQueryTaskV2.QueryResult res =
+                        loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
+
+                    finished = res.isFinished();
+
+                    it = res.getRows().iterator();
+
+                    return next();
+                }
+                catch (IgniteSQLException e) {
+                    throw e.toJdbcException();
+                }
+                catch (Exception e) {
+                    throw new SQLException("Failed to query Ignite.", e);
+                }
+            }
+
             JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, conn.cacheName(), null, loc, null,
                 fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
 
@@ -159,6 +183,9 @@ public class JdbcResultSet implements ResultSet {
 
                 return next();
             }
+            catch (IgniteSQLException e) {
+                throw e.toJdbcException();
+            }
             catch (Exception e) {
                 throw new SQLException("Failed to query Ignite.", e);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java
new file mode 100644
index 0000000..1b27296
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcSqlFieldsQuery.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.jdbc2;
+
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * {@link SqlFieldsQuery} with JDBC flavor - it has additional flag indicating whether JDBC driver expects
+ * this query to return a result set or an update counter. This class is not intended for use outside JDBC driver.
+ */
+public final class JdbcSqlFieldsQuery extends SqlFieldsQuery {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Flag set by JDBC driver to enforce checks for correct operation type. */
+    private final boolean isQry;
+
+    /**
+     * @param sql SQL query.
+     * @param isQry Flag indicating whether this object denotes a query or an update operation.
+     */
+    JdbcSqlFieldsQuery(String sql, boolean isQry) {
+        super(sql);
+        this.isQry = isQry;
+    }
+
+    /**
+     * @return Flag indicating whether this object denotes a query or an update operation..
+     */
+    public boolean isQuery() {
+        return isQry;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
index e187dc0..dbb2390 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcStatement.java
@@ -17,19 +17,25 @@
 
 package org.apache.ignite.internal.jdbc2;
 
+import java.sql.BatchUpdateException;
 import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
 import java.sql.SQLWarning;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 import static java.sql.ResultSet.CONCUR_READ_ONLY;
 import static java.sql.ResultSet.FETCH_FORWARD;
@@ -44,7 +50,7 @@ public class JdbcStatement implements Statement {
     private static final int DFLT_FETCH_SIZE = 1024;
 
     /** Connection. */
-    private final JdbcConnection conn;
+    protected final JdbcConnection conn;
 
     /** Closed flag. */
     private boolean closed;
@@ -53,10 +59,10 @@ public class JdbcStatement implements Statement {
     private int maxRows;
 
     /** Current result set. */
-    private ResultSet rs;
+    protected ResultSet rs;
 
     /** Query arguments. */
-    protected Object[] args;
+    protected ArrayList<Object> args;
 
     /** Fetch size. */
     private int fetchSize = DFLT_FETCH_SIZE;
@@ -67,6 +73,12 @@ public class JdbcStatement implements Statement {
     /** Fields indexes. */
     Map<String, Integer> fieldsIdxs = new HashMap<>();
 
+    /** Current updated items count. */
+    long updateCnt = -1;
+
+    /** Batch statements. */
+    private List<String> batch;
+
     /**
      * Creates new statement.
      *
@@ -79,12 +91,15 @@ public class JdbcStatement implements Statement {
     }
 
     /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
     @Override public ResultSet executeQuery(String sql) throws SQLException {
         ensureNotClosed();
 
         rs = null;
 
-        if (sql == null || sql.isEmpty())
+        updateCnt = -1;
+
+        if (F.isEmpty(sql))
             throw new SQLException("SQL query is empty");
 
         Ignite ignite = conn.ignite();
@@ -95,8 +110,8 @@ public class JdbcStatement implements Statement {
 
         boolean loc = nodeId == null;
 
-        JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, conn.cacheName(),
-            sql, loc, args, fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
+        JdbcQueryTask qryTask = new JdbcQueryTask(loc ? ignite : null, conn.cacheName(), sql, loc, getArgs(),
+            fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
 
         try {
             JdbcQueryTask.QueryResult res =
@@ -111,6 +126,9 @@ public class JdbcStatement implements Statement {
 
             return rs;
         }
+        catch (IgniteSQLException e) {
+            throw e.toJdbcException();
+        }
         catch (Exception e) {
             throw new SQLException("Failed to query Ignite.", e);
         }
@@ -120,7 +138,82 @@ public class JdbcStatement implements Statement {
     @Override public int executeUpdate(String sql) throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        rs = null;
+
+        updateCnt = -1;
+
+        return doUpdate(sql, getArgs());
+    }
+
+    /**
+     * Run update query.
+     * @param sql SQL query.
+     * @param args Update arguments.
+     * @return Number of affected items.
+     * @throws SQLException
+     */
+    int doUpdate(String sql, Object[] args) throws SQLException {
+        if (F.isEmpty(sql))
+            throw new SQLException("SQL query is empty");
+
+        Ignite ignite = conn.ignite();
+
+        UUID nodeId = conn.nodeId();
+
+        UUID uuid = UUID.randomUUID();
+
+        boolean loc = nodeId == null;
+
+        if (!conn.isDmlSupported())
+            throw new SQLException("Failed to query Ignite: DML operations are supported in versions 1.8.0 and newer");
+
+        JdbcQueryTaskV2 qryTask = new JdbcQueryTaskV2(loc ? ignite : null, conn.cacheName(), sql, false, loc, args,
+            fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
+
+        try {
+            JdbcQueryTaskV2.QueryResult qryRes =
+                loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
+
+            Long res = updateCounterFromQueryResult(qryRes.getRows());
+
+            updateCnt = res;
+
+            return res.intValue();
+        }
+        catch (IgniteSQLException e) {
+            throw e.toJdbcException();
+        }
+        catch (SQLException e) {
+            throw e;
+        }
+        catch (Exception e) {
+            throw new SQLException("Failed to query Ignite.", e);
+        }
+    }
+
+    /**
+     * @param rows query result.
+     * @return update counter, if found
+     * @throws SQLException if getting an update counter from result proved to be impossible.
+     */
+    private static Long updateCounterFromQueryResult(List<List<?>> rows) throws SQLException {
+         if (F.isEmpty(rows))
+            return 0L;
+
+        if (rows.size() != 1)
+            throw new SQLException("Expected number of rows of 1 for update operation");
+
+        List<?> row = rows.get(0);
+
+        if (row.size() != 1)
+            throw new SQLException("Expected row size of 1 for update operation");
+
+        Object objRes = row.get(0);
+
+        if (!(objRes instanceof Long))
+            throw new SQLException("Unexpected update result type");
+
+        return (Long) objRes;
     }
 
     /** {@inheritDoc} */
@@ -220,11 +313,59 @@ public class JdbcStatement implements Statement {
 
     /** {@inheritDoc} */
     @Override public boolean execute(String sql) throws SQLException {
+        if (!conn.isDmlSupported()) {
+            // We attempt to run a query without any checks as long as server does not support DML anyway,
+            // so it simply will throw an exception when given a DML statement instead of a query.
+            rs = executeQuery(sql);
+
+            return true;
+        }
+
         ensureNotClosed();
 
-        rs = executeQuery(sql);
+        rs = null;
+
+        updateCnt = -1;
+
+        if (F.isEmpty(sql))
+            throw new SQLException("SQL query is empty");
+
+        Ignite ignite = conn.ignite();
+
+        UUID nodeId = conn.nodeId();
 
-        return true;
+        UUID uuid = UUID.randomUUID();
+
+        boolean loc = nodeId == null;
+
+        JdbcQueryTaskV2 qryTask = new JdbcQueryTaskV2(loc ? ignite : null, conn.cacheName(), sql, null, loc, getArgs(),
+            fetchSize, uuid, conn.isLocalQuery(), conn.isCollocatedQuery(), conn.isDistributedJoins());
+
+        try {
+            JdbcQueryTaskV2.QueryResult res =
+                loc ? qryTask.call() : ignite.compute(ignite.cluster().forNodeId(nodeId)).call(qryTask);
+
+            if (res.isQuery()) {
+                JdbcResultSet rs = new JdbcResultSet(uuid, this, res.getTbls(), res.getCols(), res.getTypes(),
+                    res.getRows(), res.isFinished());
+
+                rs.setFetchSize(fetchSize);
+
+                resSets.add(rs);
+
+                this.rs = rs;
+            }
+            else
+                updateCnt = updateCounterFromQueryResult(res.getRows());
+
+            return res.isQuery();
+        }
+        catch (IgniteSQLException e) {
+            throw e.toJdbcException();
+        }
+        catch (Exception e) {
+            throw new SQLException("Failed to query Ignite.", e);
+        }
     }
 
     /** {@inheritDoc} */
@@ -242,7 +383,11 @@ public class JdbcStatement implements Statement {
     @Override public int getUpdateCount() throws SQLException {
         ensureNotClosed();
 
-        return -1;
+        long res = updateCnt;
+
+        updateCnt = -1;
+
+        return Long.valueOf(res).intValue();
     }
 
     /** {@inheritDoc} */
@@ -302,21 +447,27 @@ public class JdbcStatement implements Statement {
     @Override public void addBatch(String sql) throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        if (F.isEmpty(sql))
+            throw new SQLException("SQL query is empty");
+
+        if (batch == null)
+            batch = new ArrayList<>();
+
+        batch.add(sql);
     }
 
     /** {@inheritDoc} */
     @Override public void clearBatch() throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        batch = null;
     }
 
     /** {@inheritDoc} */
     @Override public int[] executeBatch() throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        throw new SQLFeatureNotSupportedException("Batch statements are not supported yet.");
     }
 
     /** {@inheritDoc} */
@@ -340,28 +491,37 @@ public class JdbcStatement implements Statement {
     @Override public ResultSet getGeneratedKeys() throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
     }
 
     /** {@inheritDoc} */
     @Override public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        if (autoGeneratedKeys == RETURN_GENERATED_KEYS)
+            throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+
+        return executeUpdate(sql);
     }
 
     /** {@inheritDoc} */
     @Override public int executeUpdate(String sql, int[] colIndexes) throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        if (!F.isEmpty(colIndexes))
+            throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+
+        return executeUpdate(sql);
     }
 
     /** {@inheritDoc} */
     @Override public int executeUpdate(String sql, String[] colNames) throws SQLException {
         ensureNotClosed();
 
-        throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        if (!F.isEmpty(colNames))
+            throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
+
+        return executeUpdate(sql);
     }
 
     /** {@inheritDoc} */
@@ -369,7 +529,7 @@ public class JdbcStatement implements Statement {
         ensureNotClosed();
 
         if (autoGeneratedKeys == RETURN_GENERATED_KEYS)
-            throw new SQLFeatureNotSupportedException("Updates are not supported.");
+            throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
 
         return execute(sql);
     }
@@ -378,8 +538,8 @@ public class JdbcStatement implements Statement {
     @Override public boolean execute(String sql, int[] colIndexes) throws SQLException {
         ensureNotClosed();
 
-        if (colIndexes != null && colIndexes.length > 0)
-            throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        if (!F.isEmpty(colIndexes))
+            throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
 
         return execute(sql);
     }
@@ -388,8 +548,8 @@ public class JdbcStatement implements Statement {
     @Override public boolean execute(String sql, String[] colNames) throws SQLException {
         ensureNotClosed();
 
-        if (colNames != null && colNames.length > 0)
-            throw new SQLFeatureNotSupportedException("Updates are not supported.");
+        if (!F.isEmpty(colNames))
+            throw new SQLFeatureNotSupportedException("Auto generated keys are not supported.");
 
         return execute(sql);
     }
@@ -448,11 +608,18 @@ public class JdbcStatement implements Statement {
     }
 
     /**
+     * @return Args for current statement
+     */
+    protected final Object[] getArgs() {
+        return args != null ? args.toArray() : null;
+    }
+
+    /**
      * Ensures that statement is not closed.
      *
      * @throws SQLException If statement is closed.
      */
-    protected void ensureNotClosed() throws SQLException {
+    void ensureNotClosed() throws SQLException {
         if (closed)
             throw new SQLException("Statement is closed.");
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcUtils.java
index b519340..38a838e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc2/JdbcUtils.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.jdbc2;
 
-import java.math.BigDecimal;
 import java.net.URL;
 import java.sql.Time;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.Date;
 
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+
 import static java.sql.Types.BIGINT;
 import static java.sql.Types.BINARY;
 import static java.sql.Types.BOOLEAN;
@@ -132,24 +133,12 @@ public class JdbcUtils {
     }
 
     /**
-     * Checks whether type of the object is SQL-complaint.
+     * Checks whether a class is SQL-compliant.
      *
-     * @param obj Object.
-     * @return Whether type of the object is SQL-complaint.
+     * @param cls Class.
+     * @return Whether given type is SQL-compliant.
      */
-    public static boolean sqlType(Object obj) {
-        return obj == null ||
-            obj instanceof BigDecimal ||
-            obj instanceof Boolean ||
-            obj instanceof Byte ||
-            obj instanceof byte[] ||
-            obj instanceof java.util.Date ||
-            obj instanceof Double ||
-            obj instanceof Float ||
-            obj instanceof Integer ||
-            obj instanceof Long ||
-            obj instanceof Short ||
-            obj instanceof String ||
-            obj instanceof URL;
+    static boolean isSqlType(Class<?> cls) {
+        return GridQueryProcessor.isSqlType(cls) || cls == URL.class;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
index 5a46d65..852c432 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/QueryCursorImpl.java
@@ -17,7 +17,12 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.sql.ResultSet;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -43,7 +48,10 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
         AtomicReferenceFieldUpdater.newUpdater(QueryCursorImpl.class, State.class, "state");
 
     /** Query executor. */
-    private Iterable<T> iterExec;
+    private final Iterable<T> iterExec;
+
+    /** Result type flag - result set or update counter. */
+    private final boolean isQry;
 
     /** */
     private Iterator<T> iter;
@@ -62,8 +70,7 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
      * @param cancel Cancellation closure.
      */
     public QueryCursorImpl(Iterable<T> iterExec, GridQueryCancel cancel) {
-        this.iterExec = iterExec;
-        this.cancel = cancel;
+        this(iterExec, cancel, true);
     }
 
     /**
@@ -73,6 +80,16 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
         this(iterExec, null);
     }
 
+    /**
+     * @param iterExec Query executor.
+     * @param isQry Result type flag - {@code true} for query, {@code false} for update operation.
+     */
+    public QueryCursorImpl(Iterable<T> iterExec, GridQueryCancel cancel, boolean isQry) {
+        this.iterExec = iterExec;
+        this.cancel = cancel;
+        this.isQry = isQry;
+    }
+
     /** {@inheritDoc} */
     @Override public Iterator<T> iterator() {
         if (!STATE_UPDATER.compareAndSet(this, IDLE, EXECUTION))
@@ -154,6 +171,14 @@ public class QueryCursorImpl<T> implements QueryCursorEx<T> {
     }
 
     /**
+     * @return {@code true} if this cursor corresponds to a {@link ResultSet} as a result of query,
+     * {@code false} if query was modifying operation like INSERT, UPDATE, or DELETE.
+     */
+    public boolean isQuery() {
+        return isQry;
+    }
+
+    /**
      * @param fieldsMeta SQL Fields query result metadata.
      */
     public void fieldsMeta(List<GridQueryFieldMetadata> fieldsMeta) {


Mime
View raw message