ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [5/8] ignite git commit: IGNITE-2294: Implemented DML.
Date Wed, 23 Nov 2016 09:58:43 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/test/java/org/apache/ignite/internal/binary/BinarySerialiedFieldComparatorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/BinarySerialiedFieldComparatorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinarySerialiedFieldComparatorSelfTest.java
new file mode 100644
index 0000000..c3d1951
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/BinarySerialiedFieldComparatorSelfTest.java
@@ -0,0 +1,568 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.binary;
+
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Unit tests for serialized field comparer.
+ */
+public class BinarySerialiedFieldComparatorSelfTest extends GridCommonAbstractTest {
+    /** Type counter. */
+    private static final AtomicInteger TYPE_CTR = new AtomicInteger();
+
+    /** Single field name. */
+    private static final String FIELD_SINGLE = "single";
+
+    /** Pointers to release. */
+    private final Set<Long> ptrs = new ConcurrentHashSet<>();
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        TYPE_CTR.incrementAndGet();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        for (Long ptr : ptrs)
+            GridUnsafe.freeMemory(ptr);
+
+        super.afterTest();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg =  super.getConfiguration(gridName);
+
+        cfg.setMarshaller(new BinaryMarshaller());
+
+        return cfg;
+    }
+
+    /**
+     * Test byte fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testByte() throws Exception {
+        checkTwoValues((byte)1, (byte)2);
+    }
+
+    /**
+     * Test boolean fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBoolean() throws Exception {
+        checkTwoValues(true, false);
+    }
+
+    /**
+     * Test short fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testShort() throws Exception {
+        checkTwoValues((short)1, (short)2);
+    }
+
+    /**
+     * Test char fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testChar() throws Exception {
+        checkTwoValues('a', 'b');
+    }
+
+    /**
+     * Test int fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testInt() throws Exception {
+        checkTwoValues(1, 2);
+    }
+
+    /**
+     * Test long fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testLong() throws Exception {
+        checkTwoValues(1L, 2L);
+    }
+
+    /**
+     * Test float fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFloat() throws Exception {
+        checkTwoValues(1.0f, 2.0f);
+    }
+
+    /**
+     * Test double fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDouble() throws Exception {
+        checkTwoValues(1.0d, 2.0d);
+    }
+
+    /**
+     * Test string fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testString() throws Exception {
+        checkTwoValues("str1", "str2");
+    }
+
+    /**
+     * Test date fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDate() throws Exception {
+        long time = System.currentTimeMillis();
+
+        checkTwoValues(new Date(time), new Date(time + 100));
+    }
+
+    /**
+     * Test date fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testTimestamp() throws Exception {
+        long time = System.currentTimeMillis();
+
+        checkTwoValues(new Timestamp(time), new Timestamp(time + 100));
+    }
+
+    /**
+     * Test UUID fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testUuid() throws Exception {
+        checkTwoValues(UUID.randomUUID(), UUID.randomUUID());
+    }
+
+    /**
+     * Test decimal fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDecimal() throws Exception {
+        checkTwoValues(new BigDecimal("12.3E+7"), new BigDecimal("12.4E+7"));
+        checkTwoValues(new BigDecimal("12.3E+7"), new BigDecimal("12.3E+8"));
+    }
+
+    /**
+     * Test object fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testInnerObject() throws Exception {
+        checkTwoValues(new InnerClass(1), new InnerClass(2));
+    }
+
+    /**
+     * Test byte array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testByteArray() throws Exception {
+        checkTwoValues(new byte[] { 1, 2 }, new byte[] { 1, 3 });
+        checkTwoValues(new byte[] { 1, 2 }, new byte[] { 1 });
+        checkTwoValues(new byte[] { 1, 2 }, new byte[] { 3 });
+        checkTwoValues(new byte[] { 1, 2 }, new byte[] { 1, 2, 3 });
+    }
+
+    /**
+     * Test boolean array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testBooleanArray() throws Exception {
+        checkTwoValues(new boolean[] { true, false }, new boolean[] { false, true });
+        checkTwoValues(new boolean[] { true, false }, new boolean[] { true });
+        checkTwoValues(new boolean[] { true, false }, new boolean[] { false });
+        checkTwoValues(new boolean[] { true, false }, new boolean[] { true, false, true });
+    }
+
+    /**
+     * Test short array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testShortArray() throws Exception {
+        checkTwoValues(new short[] { 1, 2 }, new short[] { 1, 3 });
+        checkTwoValues(new short[] { 1, 2 }, new short[] { 1 });
+        checkTwoValues(new short[] { 1, 2 }, new short[] { 3 });
+        checkTwoValues(new short[] { 1, 2 }, new short[] { 1, 2, 3 });
+    }
+
+    /**
+     * Test char array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testCharArray() throws Exception {
+        checkTwoValues(new char[] { 1, 2 }, new char[] { 1, 3 });
+        checkTwoValues(new char[] { 1, 2 }, new char[] { 1 });
+        checkTwoValues(new char[] { 1, 2 }, new char[] { 3 });
+        checkTwoValues(new char[] { 1, 2 }, new char[] { 1, 2, 3 });
+    }
+
+    /**
+     * Test int array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testIntArray() throws Exception {
+        checkTwoValues(new int[] { 1, 2 }, new int[] { 1, 3 });
+        checkTwoValues(new int[] { 1, 2 }, new int[] { 1 });
+        checkTwoValues(new int[] { 1, 2 }, new int[] { 3 });
+        checkTwoValues(new int[] { 1, 2 }, new int[] { 1, 2, 3 });
+    }
+
+    /**
+     * Test long array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testLongArray() throws Exception {
+        checkTwoValues(new long[] { 1, 2 }, new long[] { 1, 3 });
+        checkTwoValues(new long[] { 1, 2 }, new long[] { 1 });
+        checkTwoValues(new long[] { 1, 2 }, new long[] { 3 });
+        checkTwoValues(new long[] { 1, 2 }, new long[] { 1, 2, 3 });
+    }
+
+    /**
+     * Test float array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testFloatArray() throws Exception {
+        checkTwoValues(new float[] { 1.0f, 2.0f }, new float[] { 1.0f, 3.0f });
+        checkTwoValues(new float[] { 1.0f, 2.0f }, new float[] { 1.0f });
+        checkTwoValues(new float[] { 1.0f, 2.0f }, new float[] { 3.0f });
+        checkTwoValues(new float[] { 1.0f, 2.0f }, new float[] { 1.0f, 2.0f, 3.0f });
+    }
+
+    /**
+     * Test double array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDoubleArray() throws Exception {
+        checkTwoValues(new double[] { 1.0d, 2.0d }, new double[] { 1.0d, 3.0d });
+        checkTwoValues(new double[] { 1.0d, 2.0d }, new double[] { 1.0d });
+        checkTwoValues(new double[] { 1.0d, 2.0d }, new double[] { 3.0d });
+        checkTwoValues(new double[] { 1.0d, 2.0d }, new double[] { 1.0d, 2.0d, 3.0d });
+    }
+
+    /**
+     * Test string array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testStringArray() throws Exception {
+        checkTwoValues(new String[] { "a", "b" }, new String[] { "a", "c" });
+        checkTwoValues(new String[] { "a", "b" }, new String[] { "a" });
+        checkTwoValues(new String[] { "a", "b" }, new String[] { "c" });
+        checkTwoValues(new String[] { "a", "b" }, new String[] { "a", "b", "c" });
+    }
+
+    /**
+     * Test date array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDateArray() throws Exception {
+        long curTime = System.currentTimeMillis();
+
+        Date v1 = new Date(curTime);
+        Date v2 = new Date(curTime + 1000);
+        Date v3 = new Date(curTime + 2000);
+
+        checkTwoValues(new Date[] { v1, v2 }, new Date[] { v1, v3 });
+        checkTwoValues(new Date[] { v1, v2 }, new Date[] { v1 });
+        checkTwoValues(new Date[] { v1, v2 }, new Date[] { v3 });
+        checkTwoValues(new Date[] { v1, v2 }, new Date[] { v1, v2, v3 });
+    }
+
+    /**
+     * Test timestamp array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testTimestampArray() throws Exception {
+        long curTime = System.currentTimeMillis();
+
+        Timestamp v1 = new Timestamp(curTime);
+        Timestamp v2 = new Timestamp(curTime + 1000);
+        Timestamp v3 = new Timestamp(curTime + 2000);
+
+        checkTwoValues(new Timestamp[] { v1, v2 }, new Timestamp[] { v1, v3 });
+        checkTwoValues(new Timestamp[] { v1, v2 }, new Timestamp[] { v1 });
+        checkTwoValues(new Timestamp[] { v1, v2 }, new Timestamp[] { v3 });
+        checkTwoValues(new Timestamp[] { v1, v2 }, new Timestamp[] { v1, v2, v3 });
+    }
+
+    /**
+     * Test UUID array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testUuidArray() throws Exception {
+        UUID v1 = UUID.randomUUID();
+        UUID v2 = UUID.randomUUID();
+        UUID v3 = UUID.randomUUID();
+
+        checkTwoValues(new UUID[] { v1, v2 }, new UUID[] { v1, v3 });
+        checkTwoValues(new UUID[] { v1, v2 }, new UUID[] { v1 });
+        checkTwoValues(new UUID[] { v1, v2 }, new UUID[] { v3 });
+        checkTwoValues(new UUID[] { v1, v2 }, new UUID[] { v1, v2, v3 });
+    }
+
+    /**
+     * Test decimal array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testDecimalArray() throws Exception {
+        BigDecimal v1 = new BigDecimal("12.3E+7");
+        BigDecimal v2 = new BigDecimal("12.4E+7");
+        BigDecimal v3 = new BigDecimal("12.5E+7");
+
+        checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1, v3 });
+        checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1 });
+        checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v3 });
+        checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1, v2, v3 });
+
+        v2 = new BigDecimal("12.3E+8");
+        v3 = new BigDecimal("12.3E+9");
+
+        checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1, v3 });
+        checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1 });
+        checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v3 });
+        checkTwoValues(new BigDecimal[] { v1, v2 }, new BigDecimal[] { v1, v2, v3 });
+    }
+
+    /**
+     * Test object array fields.
+     *
+     * @throws Exception If failed.
+     */
+    public void testInnerObjectArray() throws Exception {
+        InnerClass v1 = new InnerClass(1);
+        InnerClass v2 = new InnerClass(2);
+        InnerClass v3 = new InnerClass(3);
+
+        checkTwoValues(new InnerClass[] { v1, v2 }, new InnerClass[] { v1, v3 });
+        checkTwoValues(new InnerClass[] { v1, v2 }, new InnerClass[] { v1 });
+        checkTwoValues(new InnerClass[] { v1, v2 }, new InnerClass[] { v3 });
+        checkTwoValues(new InnerClass[] { v1, v2 }, new InnerClass[] { v1, v2, v3 });
+    }
+
+    /**
+     * Check two different not-null values.
+     *
+     * @throws Exception If failed.
+     */
+    public void checkTwoValues(Object val1, Object val2) throws Exception {
+        checkTwoValues(val1, val2, false, false);
+        checkTwoValues(val1, val2, false, true);
+        checkTwoValues(val1, val2, true, false);
+        checkTwoValues(val1, val2, true, true);
+    }
+
+    /**
+     * Check two different not-null values.
+     *
+     * @param val1 Value 1.
+     * @param val2 Value 2.
+     * @param offheap1 Offheap flag 1.
+     * @param offheap2 Offheap flag 2.
+     * @throws Exception If failed.
+     */
+    public void checkTwoValues(Object val1, Object val2, boolean offheap1, boolean offheap2) throws Exception {
+        assertNotNull(val1);
+        assertNotNull(val2);
+
+        compareSingle(convert(buildSingle(val1), offheap1), convert(buildSingle(val1), offheap2), true);
+        compareSingle(convert(buildSingle(val1), offheap1), convert(buildSingle(val2), offheap2), false);
+        compareSingle(convert(buildSingle(val1), offheap1), convert(buildSingle(null), offheap2), false);
+        compareSingle(convert(buildSingle(val1), offheap1), convert(buildEmpty(), offheap2), false);
+
+        compareSingle(convert(buildSingle(val2), offheap1), convert(buildSingle(val1), offheap2), false);
+        compareSingle(convert(buildSingle(val2), offheap1), convert(buildSingle(val2), offheap2), true);
+        compareSingle(convert(buildSingle(val2), offheap1), convert(buildSingle(null), offheap2), false);
+        compareSingle(convert(buildSingle(val2), offheap1), convert(buildEmpty(), offheap2), false);
+
+        compareSingle(convert(buildSingle(null), offheap1), convert(buildSingle(val1), offheap2), false);
+        compareSingle(convert(buildSingle(null), offheap1), convert(buildSingle(val2), offheap2), false);
+        compareSingle(convert(buildSingle(null), offheap1), convert(buildSingle(null), offheap2), true);
+        compareSingle(convert(buildSingle(null), offheap1), convert(buildEmpty(), offheap2), true);
+
+        compareSingle(convert(buildEmpty(), offheap1), convert(buildSingle(val1), offheap2), false);
+        compareSingle(convert(buildEmpty(), offheap1), convert(buildSingle(val2), offheap2), false);
+        compareSingle(convert(buildEmpty(), offheap1), convert(buildSingle(null), offheap2), true);
+        compareSingle(convert(buildEmpty(), offheap1), convert(buildEmpty(), offheap2), true);
+    }
+
+    /**
+     * Compare single field.
+     *
+     * @param first First object.
+     * @param second Second object.
+     * @param expRes Expected result.
+     */
+    private void compareSingle(BinaryObjectExImpl first, BinaryObjectExImpl second, boolean expRes) {
+        BinarySerializedFieldComparator firstComp = first.createFieldComparator();
+        BinarySerializedFieldComparator secondComp = second.createFieldComparator();
+
+        // Compare expected result.
+        firstComp.findField(singleFieldOrder(first));
+        secondComp.findField(singleFieldOrder(second));
+
+        assertEquals(expRes, BinarySerializedFieldComparator.equals(firstComp, secondComp));
+    }
+
+    /**
+     * Get single field order.
+     *
+     * @param obj Object.
+     * @return Order.
+     */
+    private int singleFieldOrder(BinaryObjectExImpl obj) {
+        return obj.hasField(FIELD_SINGLE) ? 0 : BinarySchema.ORDER_NOT_FOUND;
+    }
+
+    /**
+     * Convert binary object to it's final state.
+     *
+     * @param obj Object.
+     * @param offheap Offheap flag.
+     * @return Result.
+     */
+    private BinaryObjectExImpl convert(BinaryObjectExImpl obj, boolean offheap) {
+        if (offheap) {
+            byte[] arr = obj.array();
+
+            long ptr = GridUnsafe.allocateMemory(arr.length);
+
+            ptrs.add(ptr);
+
+            GridUnsafe.copyMemory(arr, GridUnsafe.BYTE_ARR_OFF, null, ptr, arr.length);
+
+            obj = new BinaryObjectOffheapImpl(obj.context(), ptr, 0, obj.array().length);
+        }
+
+        return obj;
+    }
+
+    /**
+     * Build object with a single field.
+     *
+     * @param val Value.
+     * @return Result.
+     */
+    private BinaryObjectImpl buildSingle(Object val) {
+        return build(FIELD_SINGLE, val);
+    }
+
+    /**
+     * Build empty object.
+     *
+     * @return Empty object.
+     */
+    private BinaryObjectImpl buildEmpty() {
+        return build();
+    }
+
+    /**
+     * Build object.
+     *
+     * @param parts Parts.
+     * @return Result.
+     */
+    private BinaryObjectImpl build(Object... parts) {
+        String typeName = "Type" + TYPE_CTR.get();
+
+        BinaryObjectBuilder builder = grid().binary().builder(typeName);
+
+        if (!F.isEmpty(parts)) {
+            for (int i = 0; i < parts.length; )
+                builder.setField((String)parts[i++], parts[i++]);
+        }
+
+        return (BinaryObjectImpl) builder.build();
+    }
+
+    /**
+     * Inner class.
+     */
+    @SuppressWarnings("unused")
+    private static class InnerClass {
+        /** Value. */
+        private int val;
+
+        /**
+         * Constructor.
+         *
+         * @param val Value.
+         */
+        public InnerClass(int val) {
+            this.val = val;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java
index 150c245..2a177ff 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectsAbstractSelfTest.java
@@ -25,6 +25,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -36,17 +37,22 @@ import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryArrayIdentityResolver;
 import org.apache.ignite.binary.BinaryNameMapper;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.binary.BinaryIdentityResolver;
 import org.apache.ignite.binary.BinaryReader;
+import org.apache.ignite.binary.BinaryTypeConfiguration;
 import org.apache.ignite.binary.BinaryWriter;
 import org.apache.ignite.binary.Binarylizable;
 import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
@@ -55,9 +61,12 @@ import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.binary.BinaryObjectOffheapImpl;
+import org.apache.ignite.binary.BinaryFieldIdentityResolver;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
+import org.apache.ignite.internal.processors.cache.MapCacheStoreStrategy;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -103,13 +112,79 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA
 
         cfg.setDiscoverySpi(disco);
 
+        CacheConfiguration cacheCfg = createCacheConfig();
+
+        cacheCfg.setCacheStoreFactory(singletonFactory(new TestStore()));
+
+        CacheConfiguration binKeysCacheCfg = createCacheConfig();
+
+        binKeysCacheCfg.setCacheStoreFactory(singletonFactory(new MapCacheStoreStrategy.MapCacheStore()));
+        binKeysCacheCfg.setStoreKeepBinary(true);
+        binKeysCacheCfg.setName("BinKeysCache");
+
+        cfg.setCacheConfiguration(cacheCfg, binKeysCacheCfg);
+        cfg.setMarshaller(new BinaryMarshaller());
+
+        List<BinaryTypeConfiguration> binTypes = new ArrayList<>();
+
+        binTypes.add(new BinaryTypeConfiguration() {{
+            setTypeName("ArrayHashedKey");
+
+            setIdentityResolver(new BinaryArrayIdentityResolver());
+        }});
+
+        binTypes.add(new BinaryTypeConfiguration() {{
+            setTypeName("FieldsHashedKey");
+
+            BinaryFieldIdentityResolver id = new BinaryFieldIdentityResolver();
+            id.setFieldNames("fld1", "fld3");
+
+            setIdentityResolver(id);
+        }});
+
+        binTypes.add(new BinaryTypeConfiguration() {{
+            setTypeName("CustomHashedKey");
+
+            setIdentityResolver(new IdentityResolver());
+        }});
+
+        binTypes.add(new BinaryTypeConfiguration() {{
+            setTypeName(ComplexBinaryFieldsListHashedKey.class.getName());
+
+            BinaryFieldIdentityResolver id = new BinaryFieldIdentityResolver();
+
+            id.setFieldNames("secondField", "thirdField");
+
+            setIdentityResolver(id);
+        }});
+
+        BinaryConfiguration binCfg = new BinaryConfiguration();
+        binCfg.setTypeConfigurations(binTypes);
+
+        cfg.setBinaryConfiguration(binCfg);
+
+        CacheKeyConfiguration arrayHashCfg = new CacheKeyConfiguration("ArrayHashedKey", "fld1");
+        CacheKeyConfiguration fieldsHashCfg = new CacheKeyConfiguration("FieldsHashedKey", "fld1");
+        CacheKeyConfiguration customHashCfg = new CacheKeyConfiguration("CustomHashedKey", "fld1");
+
+        cfg.setCacheKeyConfiguration(arrayHashCfg, fieldsHashCfg, customHashCfg);
+
+        GridCacheBinaryObjectsAbstractSelfTest.cfg = cfg;
+
+        return cfg;
+    }
+
+    /**
+     * @return Cache configuration with basic settings.
+     */
+    @SuppressWarnings("unchecked")
+    private CacheConfiguration createCacheConfig() {
         CacheConfiguration cacheCfg = new CacheConfiguration();
 
         cacheCfg.setCacheMode(cacheMode());
         cacheCfg.setAtomicityMode(atomicityMode());
         cacheCfg.setNearConfiguration(nearConfiguration());
         cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
-        cacheCfg.setCacheStoreFactory(singletonFactory(new TestStore()));
         cacheCfg.setReadThrough(true);
         cacheCfg.setWriteThrough(true);
         cacheCfg.setLoadPreviousValue(true);
@@ -120,13 +195,7 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA
             cacheCfg.setOffHeapMaxMemory(0);
         }
 
-        cfg.setCacheConfiguration(cacheCfg);
-
-        cfg.setMarshaller(new BinaryMarshaller());
-
-        this.cfg = cfg;
-
-        return cfg;
+        return cacheCfg;
     }
 
     /**
@@ -924,6 +993,125 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA
     }
 
     /**
+     *
+     */
+    @SuppressWarnings("unchecked")
+    public void testCrossFormatObjectsIdentity() {
+        IgniteCache c = binKeysCache();
+
+        c.put(new ComplexBinaryFieldsListHashedKey(), "zzz");
+
+        // Now let's build an identical key for get
+        BinaryObjectBuilder bldr = grid(0).binary().builder(ComplexBinaryFieldsListHashedKey.class.getName());
+
+        bldr.setField("firstField", 365);
+        bldr.setField("secondField", "value");
+        bldr.setField("thirdField", 0x1020304050607080L);
+
+        BinaryObject binKey = bldr.build();
+
+        assertEquals("zzz", c.get(binKey));
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("unchecked")
+    public void testPutWithArrayHashing() {
+        IgniteCache c = binKeysCache();
+
+        {
+            BinaryObjectBuilder bldr = grid(0).binary().builder("ArrayHashedKey");
+
+            BinaryObject binKey = bldr.setField("fld1", 5).setField("fld2", 1).setField("fld3", "abc").build();
+
+            c.put(binKey, "zzz");
+        }
+
+        // Now let's build an identical key for get.
+        {
+            BinaryObjectBuilder bldr = grid(0).binary().builder("ArrayHashedKey");
+
+            BinaryObject binKey = bldr.setField("fld1", 5).setField("fld2", 1).setField("fld3", "abc").build();
+
+            assertEquals("zzz", c.get(binKey));
+        }
+
+        // Now let's build not identical key for get.
+        {
+            BinaryObjectBuilder bldr = grid(0).binary().builder("ArrayHashedKey");
+
+            BinaryObject binKey = bldr.setField("fld1", 5).setField("fld2", 100).setField("fld3", "abc").build();
+
+            assertNull(c.get(binKey));
+        }
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("unchecked")
+    public void testPutWithFieldsHashing() {
+        IgniteCache c = binKeysCache();
+
+        {
+            BinaryObjectBuilder bldr = grid(0).binary().builder("FieldsHashedKey");
+
+            bldr.setField("fld1", 5);
+            bldr.setField("fld2", 1);
+            bldr.setField("fld3", "abc");
+
+            BinaryObject binKey = bldr.build();
+
+            c.put(binKey, "zzz");
+        }
+
+        // Now let's build an identical key for get
+        {
+            BinaryObjectBuilder bldr = grid(0).binary().builder("FieldsHashedKey");
+
+            bldr.setField("fld1", 5);
+            bldr.setField("fld2", 100); // This one does not participate in hashing
+            bldr.setField("fld3", "abc");
+
+            BinaryObject binKey = bldr.build();
+
+            assertEquals("zzz", c.get(binKey));
+        }
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings("unchecked")
+    public void testPutWithCustomHashing() {
+        IgniteCache c = binKeysCache();
+
+        {
+            BinaryObjectBuilder bldr = grid(0).binary().builder("CustomHashedKey");
+
+            bldr.setField("fld1", 5);
+            bldr.setField("fld2", "abc");
+
+            BinaryObject binKey = bldr.build();
+
+            c.put(binKey, "zzz");
+        }
+
+        // Now let's build an identical key for get
+        {
+            BinaryObjectBuilder bldr = grid(0).binary().builder("CustomHashedKey");
+
+            bldr.setField("fld1", 5);
+            bldr.setField("fld2", "xxx");
+
+            BinaryObject binKey = bldr.build();
+
+            assertEquals("zzz", c.get(binKey));
+        }
+    }
+
+    /**
      * @throws Exception if failed.
      */
     public void testKeepBinaryTxOverwrite() throws Exception {
@@ -1034,6 +1222,13 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA
     }
 
     /**
+     * @return Cache tuned to utilize classless binary objects as keys.
+     */
+    private <K, V> IgniteCache<K, V> binKeysCache() {
+        return ignite(0).cache("BinKeysCache").withKeepBinary();
+    }
+
+    /**
      * @param key Key.
      * @throws Exception If failed.
      */
@@ -1221,4 +1416,53 @@ public abstract class GridCacheBinaryObjectsAbstractSelfTest extends GridCommonA
             // No-op.
         }
     }
+
+    /**
+     *
+     */
+    private final static class IdentityResolver implements BinaryIdentityResolver {
+        /** {@inheritDoc} */
+        @Override public int hashCode(BinaryObject builder) {
+            return (Integer) builder.field("fld1") * 31 / 5;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(BinaryObject o1, BinaryObject o2) {
+            return o1 == o2 || (o1 != null && o2 != null && F.eq(o1.field("fld1"), o2.field("fld1")));
+
+        }
+    }
+
+    /**
+     * Key to test puts and gets with
+     */
+    @SuppressWarnings({"ConstantConditions", "unused"})
+    private final static class ComplexBinaryFieldsListHashedKey {
+        /** */
+        private final Integer firstField = 1;
+
+        /** */
+        private final String secondField = "value";
+
+        /** */
+        private final Long thirdField = 0x1020304050607080L;
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            ComplexBinaryFieldsListHashedKey that = (ComplexBinaryFieldsListHashedKey) o;
+
+            return secondField.equals(that.secondField) &&
+                thirdField.equals(that.thirdField);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            int res = secondField.hashCode();
+            res = 31 * res + thirdField.hashCode();
+            return res;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
index c1d9974..3496dbf 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
@@ -18,19 +18,23 @@
 package org.apache.ignite.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.internal.binary.BinaryArrayIdentityResolverSelfTest;
 import org.apache.ignite.internal.binary.BinaryBasicIdMapperSelfTest;
 import org.apache.ignite.internal.binary.BinaryBasicNameMapperSelfTest;
 import org.apache.ignite.internal.binary.BinaryConfigurationConsistencySelfTest;
 import org.apache.ignite.internal.binary.BinaryEnumsSelfTest;
+import org.apache.ignite.internal.binary.BinaryFieldIdentityResolverSelfTest;
 import org.apache.ignite.internal.binary.BinaryFieldsHeapSelfTest;
 import org.apache.ignite.internal.binary.BinaryFieldsOffheapSelfTest;
 import org.apache.ignite.internal.binary.BinaryFooterOffsetsHeapSelfTest;
 import org.apache.ignite.internal.binary.BinaryFooterOffsetsOffheapSelfTest;
+import org.apache.ignite.internal.binary.BinaryIdentityResolverConfigurationSelfTest;
 import org.apache.ignite.internal.binary.BinaryMarshallerSelfTest;
 import org.apache.ignite.internal.binary.BinaryObjectBuilderAdditionalSelfTest;
 import org.apache.ignite.internal.binary.BinaryObjectBuilderDefaultMappersSelfTest;
 import org.apache.ignite.internal.binary.BinaryObjectBuilderSimpleNameLowerCaseMappersSelfTest;
 import org.apache.ignite.internal.binary.BinaryObjectToStringSelfTest;
+import org.apache.ignite.internal.binary.BinarySerialiedFieldComparatorSelfTest;
 import org.apache.ignite.internal.binary.BinarySimpleNameTestPropertySelfTest;
 import org.apache.ignite.internal.binary.BinaryTreeSelfTest;
 import org.apache.ignite.internal.binary.GridBinaryAffinityKeySelfTest;
@@ -89,6 +93,12 @@ public class IgniteBinaryObjectsTestSuite extends TestSuite {
 
         suite.addTestSuite(BinaryTreeSelfTest.class);
         suite.addTestSuite(BinaryMarshallerSelfTest.class);
+
+        suite.addTestSuite(BinarySerialiedFieldComparatorSelfTest.class);
+        suite.addTestSuite(BinaryArrayIdentityResolverSelfTest.class);
+        suite.addTestSuite(BinaryFieldIdentityResolverSelfTest.class);
+        suite.addTestSuite(BinaryIdentityResolverConfigurationSelfTest.class);
+
         suite.addTestSuite(BinaryConfigurationConsistencySelfTest.class);
         suite.addTestSuite(GridBinaryMarshallerCtxDisabledSelfTest.class);
         suite.addTestSuite(BinaryObjectBuilderDefaultMappersSelfTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
new file mode 100644
index 0000000..8dcba2f
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+/**
+ * Two step map-reduce style query.
+ */
+public class GridCacheTwoStepQuery {
+    /** */
+    public static final int DFLT_PAGE_SIZE = 1000;
+
+    /** */
+    @GridToStringInclude
+    private List<GridCacheSqlQuery> mapQrys = new ArrayList<>();
+
+    /** */
+    @GridToStringInclude
+    private GridCacheSqlQuery rdc;
+
+    /** */
+    private int pageSize = DFLT_PAGE_SIZE;
+
+    /** */
+    private boolean explain;
+
+    /** */
+    private Collection<String> spaces;
+
+    /** */
+    private Set<String> schemas;
+
+    /** */
+    private Set<String> tbls;
+
+    /** */
+    private boolean distributedJoins;
+
+    /** */
+    private boolean skipMergeTbl;
+
+    /** */
+    private List<Integer> caches;
+
+    /** */
+    private List<Integer> extraCaches;
+
+    /**
+     * @param schemas Schema names in query.
+     * @param tbls Tables in query.
+     */
+    public GridCacheTwoStepQuery(Set<String> schemas, Set<String> tbls) {
+        this.schemas = schemas;
+        this.tbls = tbls;
+    }
+
+    /**
+     * Specify if distributed joins are enabled for this query.
+     *
+     * @param distributedJoins Distributed joins enabled.
+     */
+    public void distributedJoins(boolean distributedJoins) {
+        this.distributedJoins = distributedJoins;
+    }
+
+    /**
+     * Check if distributed joins are enabled for this query.
+     *
+     * @return {@code true} If distributed joind enabled.
+     */
+    public boolean distributedJoins() {
+        return distributedJoins;
+    }
+
+
+    /**
+     * @return {@code True} if reduce query can skip merge table creation and get data directly from merge index.
+     */
+    public boolean skipMergeTable() {
+        return skipMergeTbl;
+    }
+
+    /**
+     * @param skipMergeTbl Skip merge table.
+     */
+    public void skipMergeTable(boolean skipMergeTbl) {
+        this.skipMergeTbl = skipMergeTbl;
+    }
+
+    /**
+     * @return If this is explain query.
+     */
+    public boolean explain() {
+        return explain;
+    }
+
+    /**
+     * @param explain If this is explain query.
+     */
+    public void explain(boolean explain) {
+        this.explain = explain;
+    }
+
+    /**
+     * @param pageSize Page size.
+     */
+    public void pageSize(int pageSize) {
+        this.pageSize = pageSize;
+    }
+
+    /**
+     * @return Page size.
+     */
+    public int pageSize() {
+        return pageSize;
+    }
+
+    /**
+     * @param qry SQL Query.
+     * @return {@code this}.
+     */
+    public GridCacheTwoStepQuery addMapQuery(GridCacheSqlQuery qry) {
+        mapQrys.add(qry);
+
+        return this;
+    }
+
+    /**
+     * @return Reduce query.
+     */
+    public GridCacheSqlQuery reduceQuery() {
+        return rdc;
+    }
+
+    /**
+     * @param rdc Reduce query.
+     */
+    public void reduceQuery(GridCacheSqlQuery rdc) {
+        this.rdc = rdc;
+    }
+
+    /**
+     * @return Map queries.
+     */
+    public List<GridCacheSqlQuery> mapQueries() {
+        return mapQrys;
+    }
+
+    /**
+     * @return Caches.
+     */
+    public List<Integer> caches() {
+        return caches;
+    }
+
+    /**
+     * @param caches Caches.
+     */
+    public void caches(List<Integer> caches) {
+        this.caches = caches;
+    }
+
+    /**
+     * @return Caches.
+     */
+    public List<Integer> extraCaches() {
+        return extraCaches;
+    }
+
+    /**
+     * @param extraCaches Caches.
+     */
+    public void extraCaches(List<Integer> extraCaches) {
+        this.extraCaches = extraCaches;
+    }
+
+    /**
+     * @return Spaces.
+     */
+    public Collection<String> spaces() {
+        return spaces;
+    }
+
+    /**
+     * @param spaces Spaces.
+     */
+    public void spaces(Collection<String> spaces) {
+        this.spaces = spaces;
+    }
+
+    /**
+     * @return Schemas.
+     */
+    public Set<String> schemas() {
+        return schemas;
+    }
+
+    /**
+     * @param args New arguments to copy with.
+     * @return Copy.
+     */
+    public GridCacheTwoStepQuery copy(Object[] args) {
+        assert !explain;
+
+        GridCacheTwoStepQuery cp = new GridCacheTwoStepQuery(schemas, tbls);
+
+        cp.caches = caches;
+        cp.extraCaches = extraCaches;
+        cp.spaces = spaces;
+        cp.rdc = rdc.copy(args);
+        cp.skipMergeTbl = skipMergeTbl;
+        cp.pageSize = pageSize;
+        cp.distributedJoins = distributedJoins;
+
+        for (int i = 0; i < mapQrys.size(); i++)
+            cp.mapQrys.add(mapQrys.get(i).copy(args));
+
+        return cp;
+    }
+
+    /**
+     * @return Tables.
+     */
+    public Set<String> tables() {
+        return tbls;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheTwoStepQuery.class, this);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
new file mode 100644
index 0000000..7634965
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -0,0 +1,1027 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryArrayIdentityResolver;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.CacheOperationContext;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheUtils;
+import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
+import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments;
+import org.apache.ignite.internal.processors.query.h2.dml.KeyValueSupplier;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
+import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.indexing.IndexingQueryFilter;
+import org.h2.command.Prepared;
+import org.h2.jdbc.JdbcPreparedStatement;
+import org.h2.table.Column;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
+
+/**
+ *
+ */
+public class DmlStatementsProcessor {
+    /** Default number of attempts to re-run DELETE and UPDATE queries in case of concurrent modifications of values. */
+    private final static int DFLT_DML_RERUN_ATTEMPTS = 4;
+
+    /** Indexing. */
+    private final IgniteH2Indexing indexing;
+
+    /** Set of binary type ids for which warning about missing identity in configuration has been printed. */
+    private final static Set<Integer> WARNED_TYPES =
+        Collections.newSetFromMap(new ConcurrentHashMap8<Integer, Boolean>());
+
+    /** Default size for update plan cache. */
+    private static final int PLAN_CACHE_SIZE = 1024;
+
+    /** Update plans cache. */
+    private final ConcurrentMap<String, ConcurrentMap<String, UpdatePlan>> planCache = new ConcurrentHashMap<>();
+
+    /** Dummy metadata for update result. */
+    private final static List<GridQueryFieldMetadata> UPDATE_RESULT_META = Collections.<GridQueryFieldMetadata>
+        singletonList(new IgniteH2Indexing.SqlFieldMetadata(null, null, "UPDATED", Long.class.getName()));
+
+    /**
+     * @param indexing indexing.
+     */
+    DmlStatementsProcessor(IgniteH2Indexing indexing) {
+        this.indexing = indexing;
+    }
+
+    /**
+     * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
+     *
+     * @param spaceName Space name.
+     * @param stmt JDBC statement.
+     * @param fieldsQry Original query.
+     * @param loc Query locality flag.
+     * @param filters Space name and key filter.
+     * @param cancel Cancel.
+     * @return Update result (modified items count and failed keys).
+     * @throws IgniteCheckedException if failed.
+     */
+    private long updateSqlFields(String spaceName, PreparedStatement stmt, SqlFieldsQuery fieldsQry,
+        boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
+        Object[] errKeys = null;
+
+        long items = 0;
+
+        UpdatePlan plan = getPlanForStatement(spaceName, stmt, null);
+
+        for (int i = 0; i < DFLT_DML_RERUN_ATTEMPTS; i++) {
+            UpdateResult r = executeUpdateStatement(plan.tbl.rowDescriptor().context(), stmt, fieldsQry, loc, filters,
+                cancel, errKeys);
+
+            if (F.isEmpty(r.errKeys))
+                return r.cnt + items;
+            else {
+                items += r.cnt;
+                errKeys = r.errKeys;
+            }
+        }
+
+        throw new IgniteSQLException("Failed to update or delete some keys: " + Arrays.deepToString(errKeys),
+            IgniteQueryErrorCode.CONCURRENT_UPDATE);
+    }
+
+    /**
+     * @param spaceName Space name.
+     * @param stmt Prepared statement.
+     * @param fieldsQry Initial query.
+     * @param cancel Query cancel.
+     * @return Update result wrapped into {@link GridQueryFieldsResult}
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    QueryCursorImpl<List<?>> updateSqlFieldsTwoStep(String spaceName, PreparedStatement stmt,
+        SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
+        long res = updateSqlFields(spaceName, stmt, fieldsQry, false, null, cancel);
+
+        return cursorForUpdateResult(res);
+    }
+
+    /**
+     * Execute DML statement on local cache.
+     * @param spaceName Space name.
+     * @param stmt Prepared statement.
+     * @param filters Space name and key filter.
+     * @param cancel Query cancel.
+     * @return Update result wrapped into {@link GridQueryFieldsResult}
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    GridQueryFieldsResult updateLocalSqlFields(String spaceName, PreparedStatement stmt,
+        SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
+        long res = updateSqlFields(spaceName, stmt, fieldsQry, true, filters, cancel);
+
+        return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
+            new IgniteSingletonIterator(Collections.singletonList(res)));
+    }
+
+    /**
+     * Actually perform SQL DML operation locally.
+     * @param cctx Cache context.
+     * @param prepStmt Prepared statement for DML query.
+     * @param filters Space name and key filter.
+     * @param failedKeys Keys to restrict UPDATE and DELETE operations with. Null or empty array means no restriction.
+     * @return Pair [number of successfully processed items; keys that have failed to be processed]
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private UpdateResult executeUpdateStatement(final GridCacheContext cctx, PreparedStatement prepStmt,
+        SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel, Object[] failedKeys)
+        throws IgniteCheckedException {
+        Integer errKeysPos = null;
+
+        if (!F.isEmpty(failedKeys))
+            errKeysPos = F.isEmpty(fieldsQry.getArgs()) ? 1 : fieldsQry.getArgs().length + 1;
+
+        UpdatePlan plan = getPlanForStatement(cctx.name(), prepStmt, errKeysPos);
+
+        Object[] params = fieldsQry.getArgs();
+
+        if (plan.fastUpdateArgs != null) {
+            assert F.isEmpty(failedKeys) && errKeysPos == null;
+
+            return new UpdateResult(doSingleUpdate(plan, params), X.EMPTY_OBJECT_ARRAY);
+        }
+
+        assert !F.isEmpty(plan.selectQry);
+
+        QueryCursorImpl<List<?>> cur;
+
+        // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
+        // subquery and not some dummy stuff like "select 1, 2, 3;"
+        if (!loc && !plan.isLocSubqry) {
+            SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated())
+                .setArgs(params)
+                .setDistributedJoins(fieldsQry.isDistributedJoins())
+                .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
+                .setLocal(fieldsQry.isLocal())
+                .setPageSize(fieldsQry.getPageSize())
+                .setTimeout(fieldsQry.getTimeout(), TimeUnit.MILLISECONDS);
+
+            cur = (QueryCursorImpl<List<?>>) indexing.queryTwoStep(cctx, newFieldsQry, cancel);
+        }
+        else {
+            final GridQueryFieldsResult res = indexing.queryLocalSqlFields(cctx.name(), plan.selectQry, F.asList(params),
+                filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
+
+            cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
+                @Override public Iterator<List<?>> iterator() {
+                    try {
+                        return new GridQueryCacheObjectsIterator(res.iterator(), cctx, cctx.keepBinary());
+                    }
+                    catch (IgniteCheckedException e) {
+                        throw new IgniteException(e);
+                    }
+                }
+            }, cancel);
+
+            cur.fieldsMeta(res.metaData());
+        }
+
+        int pageSize = loc ? 0 : fieldsQry.getPageSize();
+
+        switch (plan.mode) {
+            case MERGE:
+                return new UpdateResult(doMerge(plan, cur, pageSize), X.EMPTY_OBJECT_ARRAY);
+
+            case INSERT:
+                return new UpdateResult(doInsert(plan, cur, pageSize), X.EMPTY_OBJECT_ARRAY);
+
+            case UPDATE:
+                return doUpdate(plan, cur, pageSize);
+
+            case DELETE:
+                return doDelete(cctx, cur, pageSize);
+
+            default:
+                throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode + ']',
+                    IgniteQueryErrorCode.UNEXPECTED_OPERATION);
+        }
+    }
+
+    /**
+     * Generate SELECT statements to retrieve data for modifications from and find fast UPDATE or DELETE args,
+     * if available.
+     * @param spaceName Space name.
+     * @param prepStmt JDBC statement.
+     * @return Update plan.
+     */
+    @SuppressWarnings({"unchecked", "ConstantConditions"})
+    private UpdatePlan getPlanForStatement(String spaceName, PreparedStatement prepStmt,
+        @Nullable Integer errKeysPos) throws IgniteCheckedException {
+        Prepared p = GridSqlQueryParser.prepared((JdbcPreparedStatement) prepStmt);
+
+        spaceName = F.isEmpty(spaceName) ? "default" : spaceName;
+
+        ConcurrentMap<String, UpdatePlan> spacePlans = planCache.get(spaceName);
+
+        if (spacePlans == null) {
+            spacePlans = new GridBoundedConcurrentLinkedHashMap<>(PLAN_CACHE_SIZE);
+
+            spacePlans = U.firstNotNull(planCache.putIfAbsent(spaceName, spacePlans), spacePlans);
+        }
+
+        // getSQL returns field value, so it's fast
+        // Don't look for re-runs in cache, we don't cache them
+        UpdatePlan res = (errKeysPos == null ? spacePlans.get(p.getSQL()) : null);
+
+        if (res != null)
+            return res;
+
+        res = UpdatePlanBuilder.planForStatement(p, errKeysPos);
+
+        // Don't cache re-runs
+        if (errKeysPos == null)
+            return U.firstNotNull(spacePlans.putIfAbsent(p.getSQL(), res), res);
+        else
+            return res;
+    }
+
+    /**
+     * Perform single cache operation based on given args.
+     * @param params Query parameters.
+     * @return 1 if an item was affected, 0 otherwise.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings({"unchecked", "ConstantConditions"})
+    private static long doSingleUpdate(UpdatePlan plan, Object[] params) throws IgniteCheckedException {
+        GridCacheContext cctx = plan.tbl.rowDescriptor().context();
+
+        FastUpdateArguments singleUpdate = plan.fastUpdateArgs;
+
+        assert singleUpdate != null;
+
+        int res;
+
+        Object key = singleUpdate.key.apply(params);
+        Object val = singleUpdate.val.apply(params);
+        Object newVal = singleUpdate.newVal.apply(params);
+
+        if (newVal != null) { // Single item UPDATE
+            if (val == null) // No _val bound in source query
+                res = cctx.cache().replace(key, newVal) ? 1 : 0;
+            else
+                res = cctx.cache().replace(key, val, newVal) ? 1 : 0;
+        }
+        else { // Single item DELETE
+            if (val == null) // No _val bound in source query
+                res = cctx.cache().remove(key) ? 1 : 0;
+            else
+                res = cctx.cache().remove(key, val) ? 1 : 0;
+        }
+
+        return res;
+    }
+
+    /**
+     * Perform DELETE operation on top of results of SELECT.
+     * @param cctx Cache context.
+     * @param cursor SELECT results.
+     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+     * @return Results of DELETE (number of items affected AND keys that failed to be updated).
+     */
+    @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
+    private UpdateResult doDelete(GridCacheContext cctx, QueryCursorImpl<List<?>> cursor, int pageSize)
+        throws IgniteCheckedException {
+        // With DELETE, we have only two columns - key and value.
+        long res = 0;
+
+        CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+        // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+        if (cctx.binaryMarshaller()) {
+            CacheOperationContext newOpCtx = null;
+
+            if (opCtx == null)
+                // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+                newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
+            else if (!opCtx.isKeepBinary())
+                newOpCtx = opCtx.keepBinary();
+
+            if (newOpCtx != null)
+                cctx.operationContextPerCall(newOpCtx);
+        }
+
+        // Keys that failed to DELETE due to concurrent updates.
+        List<Object> failedKeys = new ArrayList<>();
+
+        SQLException resEx = null;
+
+        try {
+            Iterator<List<?>> it = cursor.iterator();
+            Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
+
+            while (it.hasNext()) {
+                List<?> e = it.next();
+                if (e.size() != 2) {
+                    U.warn(indexing.getLogger(), "Invalid row size on DELETE - expected 2, got " + e.size());
+                    continue;
+                }
+
+                rows.put(e.get(0), new ModifyingEntryProcessor(e.get(1), RMV));
+
+                if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
+                    PageProcessingResult pageRes = processPage(cctx, rows);
+
+                    res += pageRes.cnt;
+
+                    failedKeys.addAll(F.asList(pageRes.errKeys));
+
+                    if (pageRes.ex != null) {
+                        if (resEx == null)
+                            resEx = pageRes.ex;
+                        else
+                            resEx.setNextException(pageRes.ex);
+                    }
+
+                    if (it.hasNext())
+                        rows.clear(); // No need to clear after the last batch.
+                }
+            }
+
+            if (resEx != null) {
+                if (!F.isEmpty(failedKeys)) {
+                    // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
+                    // had been modified concurrently right away.
+                    String msg = "Failed to DELETE some keys because they had been modified concurrently " +
+                        "[keys=" + failedKeys + ']';
+
+                    SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+
+                    conEx.setNextException(resEx);
+
+                    resEx = conEx;
+                }
+
+                throw new IgniteSQLException(resEx);
+            }
+        }
+        finally {
+            cctx.operationContextPerCall(opCtx);
+        }
+
+        return new UpdateResult(res, failedKeys.toArray());
+    }
+
+    /**
+     * Perform UPDATE operation on top of results of SELECT.
+     * @param cursor SELECT results.
+     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+     * @return Pair [cursor corresponding to results of UPDATE (contains number of items affected); keys whose values
+     *     had been modified concurrently (arguments for a re-run)].
+     */
+    @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
+    private UpdateResult doUpdate(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize)
+        throws IgniteCheckedException {
+        GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
+
+        GridCacheContext cctx = desc.context();
+
+        boolean bin = cctx.binaryMarshaller();
+
+        String[] updatedColNames = plan.colNames;
+
+        int valColIdx = plan.valColIdx;
+
+        boolean hasNewVal = (valColIdx != -1);
+
+        // Statement updates distinct properties if it does not have _val in updated columns list
+        // or if its list of updated columns includes only _val, i.e. is single element.
+        boolean hasProps = !hasNewVal || updatedColNames.length > 1;
+
+        long res = 0;
+
+        CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+        // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+        if (cctx.binaryMarshaller()) {
+            CacheOperationContext newOpCtx = null;
+
+            if (opCtx == null)
+                // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+                newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
+            else if (!opCtx.isKeepBinary())
+                newOpCtx = opCtx.keepBinary();
+
+            if (newOpCtx != null)
+                cctx.operationContextPerCall(newOpCtx);
+        }
+
+        Map<Object, EntryProcessor<Object, Object, Boolean>> rows = new LinkedHashMap<>();
+
+        // Keys that failed to UPDATE due to concurrent updates.
+        List<Object> failedKeys = new ArrayList<>();
+
+        SQLException resEx = null;
+
+        try {
+            Iterator<List<?>> it = cursor.iterator();
+
+            while (it.hasNext()) {
+                List<?> e = it.next();
+                Object key = e.get(0);
+                Object val = (hasNewVal ? e.get(valColIdx) : e.get(1));
+
+                Object newVal;
+
+                Map<String, Object> newColVals = new HashMap<>();
+
+                for (int i = 0; i < plan.colNames.length; i++) {
+                    if (hasNewVal && i == valColIdx - 2)
+                        continue;
+
+                    newColVals.put(plan.colNames[i], e.get(i + 2));
+                }
+
+                newVal = plan.valSupplier.apply(e);
+
+                if (bin && !(val instanceof BinaryObject))
+                    val = cctx.grid().binary().toBinary(val);
+
+                // Skip key and value - that's why we start off with 2nd column
+                for (int i = 0; i < plan.tbl.getColumns().length - 2; i++) {
+                    Column c = plan.tbl.getColumn(i + 2);
+
+                    boolean hasNewColVal = newColVals.containsKey(c.getName());
+
+                    // Binary objects get old field values from the Builder, so we can skip what we're not updating
+                    if (bin && !hasNewColVal)
+                        continue;
+
+                    Object colVal = hasNewColVal ? newColVals.get(c.getName()) : desc.columnValue(key, val, i);
+
+                    desc.setColumnValue(key, newVal, colVal, i);
+                }
+
+                if (bin && hasProps) {
+                    assert newVal instanceof BinaryObjectBuilder;
+
+                    newVal = ((BinaryObjectBuilder) newVal).build();
+                }
+
+                Object srcVal = e.get(1);
+
+                if (bin && !(srcVal instanceof BinaryObject))
+                    srcVal = cctx.grid().binary().toBinary(srcVal);
+
+                rows.put(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal)));
+
+                if ((pageSize > 0 && rows.size() == pageSize) || (!it.hasNext())) {
+                    PageProcessingResult pageRes = processPage(cctx, rows);
+
+                    res += pageRes.cnt;
+
+                    failedKeys.addAll(F.asList(pageRes.errKeys));
+
+                    if (pageRes.ex != null) {
+                        if (resEx == null)
+                            resEx = pageRes.ex;
+                        else
+                            resEx.setNextException(pageRes.ex);
+                    }
+
+                    if (it.hasNext())
+                        rows.clear(); // No need to clear after the last batch.
+                }
+            }
+
+            if (resEx != null) {
+                if (!F.isEmpty(failedKeys)) {
+                    // Don't go for a re-run if processing of some keys yielded exceptions and report keys that
+                    // had been modified concurrently right away.
+                    String msg = "Failed to UPDATE some keys because they had been modified concurrently " +
+                        "[keys=" + failedKeys + ']';
+
+                    SQLException dupEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
+
+                    dupEx.setNextException(resEx);
+
+                    resEx = dupEx;
+                }
+
+                throw new IgniteSQLException(resEx);
+            }
+        }
+        finally {
+            cctx.operationContextPerCall(opCtx);
+        }
+
+        return new UpdateResult(res, failedKeys.toArray());
+    }
+
+    /**
+     * Process errors of entry processor - split the keys into duplicated/concurrently modified and those whose
+     * processing yielded an exception.
+     *
+     * @param res Result of {@link GridCacheAdapter#invokeAll)}
+     * @return pair [array of duplicated/concurrently modified keys, SQL exception for erroneous keys] (exception is
+     * null if all keys are duplicates/concurrently modified ones).
+     */
+    private static PageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res) {
+        Set<Object> errKeys = new LinkedHashSet<>(res.keySet());
+
+        SQLException currSqlEx = null;
+
+        SQLException firstSqlEx = null;
+
+        int errors = 0;
+
+        // Let's form a chain of SQL exceptions
+        for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : res.entrySet()) {
+            try {
+                e.getValue().get();
+            }
+            catch (EntryProcessorException ex) {
+                SQLException next = createJdbcSqlException("Failed to process key '" + e.getKey() + '\'',
+                    IgniteQueryErrorCode.ENTRY_PROCESSING);
+
+                next.initCause(ex);
+
+                if (currSqlEx != null)
+                    currSqlEx.setNextException(next);
+                else
+                    firstSqlEx = next;
+
+                currSqlEx = next;
+
+                errKeys.remove(e.getKey());
+
+                errors++;
+            }
+        }
+
+        return new PageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors);
+    }
+
+    /**
+     * Execute MERGE statement plan.
+     * @param cursor Cursor to take inserted data from.
+     * @param pageSize Batch size to stream data from {@code cursor}, anything <= 0 for single page operations.
+     * @return Number of items affected.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings("unchecked")
+    private long doMerge(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+        GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
+
+        GridCacheContext cctx = desc.context();
+
+        // If we have just one item to put, just do so
+        if (plan.rowsNum == 1) {
+            IgniteBiTuple t = rowToKeyValue(cctx, cursor.iterator().next().toArray(), plan.colNames, plan.keySupplier,
+                plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc.type());
+
+            cctx.cache().put(t.getKey(), t.getValue());
+            return 1;
+        }
+        else {
+            int resCnt = 0;
+            Map<Object, Object> rows = new LinkedHashMap<>();
+
+            for (Iterator<List<?>> it = cursor.iterator(); it.hasNext();) {
+                List<?> row = it.next();
+
+                IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.keySupplier, plan.valSupplier,
+                    plan.keyColIdx, plan.valColIdx, desc.type());
+
+                rows.put(t.getKey(), t.getValue());
+
+                if ((pageSize > 0 && rows.size() == pageSize) || !it.hasNext()) {
+                    cctx.cache().putAll(rows);
+                    resCnt += rows.size();
+
+                    if (it.hasNext())
+                        rows.clear();
+                }
+            }
+
+            return resCnt;
+        }
+    }
+
+    /**
+     * Execute INSERT statement plan.
+     * @param cursor Cursor to take inserted data from.
+     * @param pageSize Batch size for streaming, anything <= 0 for single page operations.
+     * @return Number of items affected.
+     * @throws IgniteCheckedException if failed, particularly in case of duplicate keys.
+     */
+    @SuppressWarnings({"unchecked", "ConstantConditions"})
+    private long doInsert(UpdatePlan plan, QueryCursorImpl<List<?>> cursor, int pageSize) throws IgniteCheckedException {
+        GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
+
+        GridCacheContext cctx = desc.context();
+
+        // If we have just one item to put, just do so
+        if (plan.rowsNum == 1) {
+            IgniteBiTuple t = rowToKeyValue(cctx, cursor.iterator().next().toArray(), plan.colNames, plan.keySupplier,
+                plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc.type());
+
+            if (cctx.cache().putIfAbsent(t.getKey(), t.getValue()))
+                return 1;
+            else
+                throw new IgniteSQLException("Duplicate key during INSERT [key=" + t.getKey() + ']',
+                    IgniteQueryErrorCode.DUPLICATE_KEY);
+        }
+        else {
+            CacheOperationContext opCtx = cctx.operationContextPerCall();
+
+            // Force keepBinary for operation context to avoid binary deserialization inside entry processor
+            if (cctx.binaryMarshaller()) {
+                CacheOperationContext newOpCtx = null;
+
+                if (opCtx == null)
+                    // Mimics behavior of GridCacheAdapter#keepBinary and GridCacheProxyImpl#keepBinary
+                    newOpCtx = new CacheOperationContext(false, null, true, null, false, null);
+                else if (!opCtx.isKeepBinary())
+                    newOpCtx = opCtx.keepBinary();
+
+                if (newOpCtx != null)
+                    cctx.operationContextPerCall(newOpCtx);
+            }
+
+            Map<Object, EntryProcessor<Object, Object, Boolean>> rows = plan.isLocSubqry ?
+                new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>(plan.rowsNum) :
+                new LinkedHashMap<Object, EntryProcessor<Object, Object, Boolean>>();
+
+            // Keys that failed to INSERT due to duplication.
+            List<Object> duplicateKeys = new ArrayList<>();
+
+            int resCnt = 0;
+
+            SQLException resEx = null;
+
+            try {
+                Iterator<List<?>> it = cursor.iterator();
+
+                while (it.hasNext()) {
+                    List<?> row = it.next();
+
+                    final IgniteBiTuple t = rowToKeyValue(cctx, row.toArray(), plan.colNames, plan.keySupplier,
+                        plan.valSupplier, plan.keyColIdx, plan.valColIdx, desc.type());
+
+                    rows.put(t.getKey(), new InsertEntryProcessor(t.getValue()));
+
+                    if (!it.hasNext() || (pageSize > 0 && rows.size() == pageSize)) {
+                        PageProcessingResult pageRes = processPage(cctx, rows);
+
+                        resCnt += pageRes.cnt;
+
+                        duplicateKeys.addAll(F.asList(pageRes.errKeys));
+
+                        if (pageRes.ex != null) {
+                            if (resEx == null)
+                                resEx = pageRes.ex;
+                            else
+                                resEx.setNextException(pageRes.ex);
+                        }
+
+                        rows.clear();
+                    }
+                }
+
+                if (!F.isEmpty(duplicateKeys)) {
+                    String msg = "Failed to INSERT some keys because they are already in cache " +
+                        "[keys=" + duplicateKeys + ']';
+
+                    SQLException dupEx = new SQLException(msg, null, IgniteQueryErrorCode.DUPLICATE_KEY);
+
+                    if (resEx == null)
+                        resEx = dupEx;
+                    else
+                        resEx.setNextException(dupEx);
+                }
+
+                if (resEx != null)
+                    throw new IgniteSQLException(resEx);
+
+                return resCnt;
+            }
+            finally {
+                cctx.operationContextPerCall(opCtx);
+            }
+        }
+    }
+
+    /**
+     * Execute given entry processors and collect errors, if any.
+     * @param cctx Cache context.
+     * @param rows Rows to process.
+     * @return Triple [number of rows actually changed; keys that failed to update (duplicates or concurrently
+     *     updated ones); chain of exceptions for all keys whose processing resulted in error, or null for no errors].
+     * @throws IgniteCheckedException
+     */
+    @SuppressWarnings({"unchecked", "ConstantConditions"})
+    private static PageProcessingResult processPage(GridCacheContext cctx,
+        Map<Object, EntryProcessor<Object, Object, Boolean>> rows) throws IgniteCheckedException {
+        Map<Object, EntryProcessorResult<Boolean>> res = cctx.cache().invokeAll(rows);
+
+        if (F.isEmpty(res))
+            return new PageProcessingResult(rows.size(), null, null);
+
+        PageProcessingErrorResult splitRes = splitErrors(res);
+
+        int keysCnt = splitRes.errKeys.length;
+
+        return new PageProcessingResult(rows.size() - keysCnt - splitRes.cnt, splitRes.errKeys, splitRes.ex);
+    }
+
+    /**
+     * Convert row presented as an array of Objects into key-value pair to be inserted to cache.
+     *
+     * @param cctx Cache context.
+     * @param row Row to process.
+     * @param cols Query cols.
+     * @param keySupplier Key instantiation method.
+     * @param valSupplier Key instantiation method.
+     * @param keyColIdx Key column index, or {@code -1} if no key column is mentioned in {@code cols}.
+     * @param valColIdx Value column index, or {@code -1} if no value column is mentioned in {@code cols}.
+     * @param desc Table descriptor.
+     * @return Key-value pair.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings({"unchecked", "ConstantConditions", "ResultOfMethodCallIgnored"})
+    private IgniteBiTuple<?, ?> rowToKeyValue(GridCacheContext cctx, Object[] row, String[] cols,
+        KeyValueSupplier keySupplier, KeyValueSupplier valSupplier, int keyColIdx, int valColIdx,
+        GridQueryTypeDescriptor desc) throws IgniteCheckedException {
+        Object key = keySupplier.apply(F.asList(row));
+        Object val = valSupplier.apply(F.asList(row));
+
+        if (key == null)
+            throw new IgniteSQLException("Key for INSERT or MERGE must not be null",  IgniteQueryErrorCode.NULL_KEY);
+
+        if (val == null)
+            throw new IgniteSQLException("Value for INSERT or MERGE must not be null", IgniteQueryErrorCode.NULL_VALUE);
+
+        for (int i = 0; i < cols.length; i++) {
+            if (i == keyColIdx || i == valColIdx)
+                continue;
+
+            desc.setValue(cols[i], key, val, row[i]);
+        }
+
+        if (cctx.binaryMarshaller()) {
+            if (key instanceof BinaryObjectBuilder)
+                key = ((BinaryObjectBuilder) key).build();
+
+            if (val instanceof BinaryObjectBuilder)
+                val = ((BinaryObjectBuilder) val).build();
+
+            if (key instanceof BinaryObject)
+                key = updateHashCodeIfNeeded(cctx, (BinaryObject) key);
+
+            if (val instanceof BinaryObject)
+                val = updateHashCodeIfNeeded(cctx, (BinaryObject) val);
+        }
+
+        return new IgniteBiTuple<>(key, val);
+    }
+
+    /**
+     * Set hash code to binary object if it does not have one.
+     *
+     * @param cctx Cache context.
+     * @param binObj Binary object.
+     * @return Binary object with hash code set.
+     */
+    private BinaryObject updateHashCodeIfNeeded(GridCacheContext cctx, BinaryObject binObj) {
+        if (U.isHashCodeEmpty(binObj)) {
+            if (WARNED_TYPES.add(binObj.type().typeId()))
+                U.warn(indexing.getLogger(), "Binary object's type does not have identity resolver explicitly set, therefore " +
+                    "BinaryArrayIdentityResolver is used to generate hash codes for its instances, and therefore " +
+                    "hash code of this binary object will most likely not match that of its non serialized form. " +
+                    "For finer control over identity of this type, please update your BinaryConfiguration accordingly." +
+                    " [typeId=" + binObj.type().typeId() + ", typeName=" + binObj.type().typeName() + ']');
+
+            int hash = BinaryArrayIdentityResolver.instance().hashCode(binObj);
+
+            // Empty hash code means no identity set for the type, therefore, we can safely set hash code
+            // via this Builder as it won't be overwritten.
+            return cctx.grid().binary().builder(binObj)
+                .hashCode(hash)
+                .build();
+        }
+        else
+            return binObj;
+    }
+
+    /** */
+    private final static class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
+        /** Value to set. */
+        private final Object val;
+
+        /** */
+        private InsertEntryProcessor(Object val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean process(MutableEntry<Object, Object> entry, Object... arguments) throws EntryProcessorException {
+            if (entry.getValue() != null)
+                return false;
+
+            entry.setValue(val);
+            return null; // To leave out only erroneous keys - nulls are skipped on results' processing.
+        }
+    }
+
+    /**
+     * Entry processor invoked by UPDATE and DELETE operations.
+     */
+    private final static class ModifyingEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
+        /** Value to expect. */
+        private final Object val;
+
+        /** Action to perform on entry. */
+        private final IgniteInClosure<MutableEntry<Object, Object>> entryModifier;
+
+        /** */
+        private ModifyingEntryProcessor(Object val, IgniteInClosure<MutableEntry<Object, Object>> entryModifier) {
+            this.val = val;
+            this.entryModifier = entryModifier;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean process(MutableEntry<Object, Object> entry, Object... arguments) throws EntryProcessorException {
+            // Something happened to the cache while we were performing map-reduce.
+            if (!F.eq(entry.getValue(), val))
+                return false;
+
+            entryModifier.apply(entry);
+            return null; // To leave out only erroneous keys - nulls are skipped on results' processing.
+        }
+    }
+
+    /** */
+    private static IgniteInClosure<MutableEntry<Object, Object>> RMV = new IgniteInClosure<MutableEntry<Object, Object>>() {
+        /** {@inheritDoc} */
+        @Override public void apply(MutableEntry<Object, Object> e) {
+            e.remove();
+        }
+    };
+
+    /**
+     *
+     */
+    private static final class EntryValueUpdater implements IgniteInClosure<MutableEntry<Object, Object>> {
+        /** Value to set. */
+        private final Object val;
+
+        /** */
+        private EntryValueUpdater(Object val) {
+            this.val = val;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(MutableEntry<Object, Object> e) {
+            e.setValue(val);
+        }
+    }
+
+    /**
+     * Wrap result of DML operation (number of items affected) to Iterable suitable to be wrapped by cursor.
+     *
+     * @param itemsCnt Update result to wrap.
+     * @return Resulting Iterable.
+     */
+    @SuppressWarnings("unchecked")
+    private static QueryCursorImpl<List<?>> cursorForUpdateResult(long itemsCnt) {
+        QueryCursorImpl<List<?>> res =
+            new QueryCursorImpl(Collections.singletonList(Collections.singletonList(itemsCnt)), null, false);
+
+        res.fieldsMeta(UPDATE_RESULT_META);
+
+        return res;
+    }
+
+    /** Update result - modifications count and keys to re-run query with, if needed. */
+    private final static class UpdateResult {
+        /** Number of processed items. */
+        final long cnt;
+
+        /** Keys that failed to be UPDATEd or DELETEd due to concurrent modification of values. */
+        @NotNull
+        final Object[] errKeys;
+
+        /** */
+        @SuppressWarnings("ConstantConditions")
+        private UpdateResult(long cnt, Object[] errKeys) {
+            this.cnt = cnt;
+            this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+        }
+    }
+
+    /** Result of processing an individual page with {@link IgniteCache#invokeAll} including error details, if any. */
+    private final static class PageProcessingResult {
+        /** Number of successfully processed items. */
+        final long cnt;
+
+        /** Keys that failed to be UPDATEd or DELETEd due to concurrent modification of values. */
+        @NotNull
+        final Object[] errKeys;
+
+        /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */
+        final SQLException ex;
+
+        /** */
+        @SuppressWarnings("ConstantConditions")
+        private PageProcessingResult(long cnt, Object[] errKeys, SQLException ex) {
+            this.cnt = cnt;
+            this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+            this.ex = ex;
+        }
+    }
+
+    /** Result of splitting keys whose processing resulted into an exception from those skipped by
+     * logic of {@link EntryProcessor}s (most likely INSERT duplicates, or UPDATE/DELETE keys whose values
+     * had been modified concurrently), counting and collecting entry processor exceptions.
+     */
+    private final static class PageProcessingErrorResult {
+        /** Keys that failed to be processed by {@link EntryProcessor} (not due to an exception). */
+        @NotNull
+        final Object[] errKeys;
+
+        /** Number of entries whose processing resulted into an exception. */
+        final int cnt;
+
+        /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */
+        final SQLException ex;
+
+        /** */
+        @SuppressWarnings("ConstantConditions")
+        private PageProcessingErrorResult(@NotNull Object[] errKeys, SQLException ex, int exCnt) {
+            errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+            // When exceptions count must be zero, exceptions chain must be not null, and vice versa.
+            assert exCnt == 0 ^ ex != null;
+
+            this.errKeys = errKeys;
+            this.cnt = exCnt;
+            this.ex = ex;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/86d143bb/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/GridH2ResultSetIterator.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/GridH2ResultSetIterator.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/GridH2ResultSetIterator.java
index e0680d3..c8c26c4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/GridH2ResultSetIterator.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/GridH2ResultSetIterator.java
@@ -24,6 +24,7 @@ import java.util.NoSuchElementException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2ValueCacheObject;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -136,7 +137,7 @@ public abstract class GridH2ResultSetIterator<T> extends GridCloseableIteratorAd
             return true;
         }
         catch (SQLException e) {
-            throw new IgniteException(e);
+            throw new IgniteSQLException(e);
         }
     }
 


Mime
View raw message