ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [21/50] [abbrv] ignite git commit: IGNITE-7200: SQL: simplified DML module structure and restored encapsulation. This closes #3225.
Date Wed, 20 Dec 2017 08:04:29 GMT
IGNITE-7200: SQL: simplified DML module structure and restored encapsulation. This closes #3225.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/03bb5513
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/03bb5513
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/03bb5513

Branch: refs/heads/ignite-zk
Commit: 03bb551382be8303a4bcaf0afc3ffa0f9c2885dd
Parents: 802a166
Author: devozerov <vozerov@gridgain.com>
Authored: Thu Dec 14 15:59:37 2017 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Thu Dec 14 15:59:37 2017 +0300

----------------------------------------------------------------------
 .../query/h2/DmlStatementsProcessor.java        | 637 ++----------------
 .../processors/query/h2/UpdateResult.java       |   4 +-
 .../processors/query/h2/dml/DmlAstUtils.java    | 609 ++++++++++++++++++
 .../processors/query/h2/dml/DmlBatchSender.java | 232 +++++++
 .../query/h2/dml/DmlDistributedPlanInfo.java    |  56 ++
 .../h2/dml/DmlPageProcessingErrorResult.java    |  76 +++
 .../query/h2/dml/DmlPageProcessingResult.java   |  68 ++
 .../processors/query/h2/dml/DmlUtils.java       | 118 ++++
 .../processors/query/h2/dml/FastUpdate.java     | 175 +++++
 .../query/h2/dml/FastUpdateArguments.java       |  53 --
 .../processors/query/h2/dml/UpdatePlan.java     | 389 ++++++++---
 .../query/h2/dml/UpdatePlanBuilder.java         |  82 ++-
 .../processors/query/h2/sql/DmlAstUtils.java    | 644 -------------------
 .../query/h2/sql/GridSqlQueryParser.java        |   1 +
 14 files changed, 1752 insertions(+), 1392 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index c3d48dd..243d1dc 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -17,42 +17,29 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
-import java.lang.reflect.Array;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.sql.Time;
-import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.EntryProcessorException;
-import javax.cache.processor.EntryProcessorResult;
 import javax.cache.processor.MutableEntry;
-import org.apache.ignite.IgniteCache;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.binary.BinaryObjectBuilder;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
-import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.QueryCursorImpl;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
@@ -61,21 +48,19 @@ import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResult;
 import org.apache.ignite.internal.processors.query.GridQueryFieldsResultAdapter;
-import org.apache.ignite.internal.processors.query.GridQueryProperty;
-import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
-import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.h2.dml.FastUpdateArguments;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlBatchSender;
+import org.apache.ignite.internal.processors.query.h2.dml.DmlDistributedPlanInfo;
+import org.apache.ignite.internal.processors.query.h2.dml.FastUpdate;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdateMode;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlan;
 import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
-import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.lang.IgniteSingletonIterator;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteInClosure;
@@ -85,19 +70,10 @@ import org.h2.command.dml.Delete;
 import org.h2.command.dml.Insert;
 import org.h2.command.dml.Merge;
 import org.h2.command.dml.Update;
-import org.h2.table.Column;
-import org.h2.util.DateTimeUtils;
-import org.h2.util.LocalDateTimeUtils;
-import org.h2.value.Value;
-import org.h2.value.ValueDate;
-import org.h2.value.ValueTime;
-import org.h2.value.ValueTimestamp;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
 import static org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing.UPDATE_RESULT_META;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap.DEFAULT_COLUMNS_COUNT;
 
 /**
  *
@@ -142,7 +118,7 @@ public class DmlStatementsProcessor {
         while (iter.hasNext()) {
             UpdatePlan plan = iter.next().getValue();
 
-            if (F.eq(cacheName, plan.tbl.cacheName()))
+            if (F.eq(cacheName, plan.cacheContext().name()))
                 iter.remove();
         }
     }
@@ -169,7 +145,7 @@ public class DmlStatementsProcessor {
 
         UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null);
 
-        GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
+        GridCacheContext<?, ?> cctx = plan.cacheContext();
 
         for (int i = 0; i < DFLT_DML_RERUN_ATTEMPTS; i++) {
             CacheOperationContext opCtx = cctx.operationContextPerCall();
@@ -281,20 +257,20 @@ public class DmlStatementsProcessor {
 
         UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null);
 
-        if (!F.eq(streamer.cacheName(), plan.tbl.rowDescriptor().context().name()))
+        if (!F.eq(streamer.cacheName(), plan.cacheContext().name()))
             throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" +
                 " in connection options", IgniteQueryErrorCode.UNSUPPORTED_OPERATION);
 
-        if (plan.mode == UpdateMode.INSERT && plan.rowsNum > 0) {
-            assert plan.isLocSubqry;
+        if (plan.mode() == UpdateMode.INSERT && plan.rowCount() > 0) {
+            assert plan.isLocalSubquery();
 
-            final GridCacheContext cctx = plan.tbl.rowDescriptor().context();
+            final GridCacheContext cctx = plan.cacheContext();
 
             QueryCursorImpl<List<?>> cur;
 
-            final ArrayList<List<?>> data = new ArrayList<>(plan.rowsNum);
+            final ArrayList<List<?>> data = new ArrayList<>(plan.rowCount());
 
-            final GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQry,
+            final GridQueryFieldsResult res = idx.queryLocalSqlFields(idx.schema(cctx.name()), plan.selectQuery(),
                 F.asList(args), null, false, 0, null);
 
             QueryCursorImpl<List<?>> stepCur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@@ -317,18 +293,18 @@ public class DmlStatementsProcessor {
                 }
             }, null);
 
-            if (plan.rowsNum == 1) {
-                IgniteBiTuple t = rowToKeyValue(cctx, cur.iterator().next(), plan);
+            if (plan.rowCount() == 1) {
+                IgniteBiTuple t = plan.processRow(cur.iterator().next());
 
                 streamer.addData(t.getKey(), t.getValue());
 
                 return 1;
             }
 
-            Map<Object, Object> rows = new LinkedHashMap<>(plan.rowsNum);
+            Map<Object, Object> rows = new LinkedHashMap<>(plan.rowCount());
 
             for (List<?> row : cur) {
-                final IgniteBiTuple t = rowToKeyValue(cctx, row, plan);
+                final IgniteBiTuple t = plan.processRow(row);
 
                 rows.put(t.getKey(), t.getValue());
             }
@@ -367,13 +343,15 @@ public class DmlStatementsProcessor {
 
         UpdatePlan plan = getPlanForStatement(schemaName, c, prepared, fieldsQry, loc, errKeysPos);
 
-        if (plan.fastUpdateArgs != null) {
+        FastUpdate fastUpdate = plan.fastUpdate();
+
+        if (fastUpdate != null) {
             assert F.isEmpty(failedKeys) && errKeysPos == null;
 
-            return doFastUpdate(plan, fieldsQry.getArgs());
+            return fastUpdate.execute(plan.cacheContext().cache(), fieldsQry.getArgs());
         }
 
-        if (plan.distributed != null) {
+        if (plan.distributedPlan() != null) {
             UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel);
 
             // null is returned in case not all nodes support distributed DML.
@@ -381,14 +359,14 @@ public class DmlStatementsProcessor {
                 return result;
         }
 
-        assert !F.isEmpty(plan.selectQry);
+        assert !F.isEmpty(plan.selectQuery());
 
         QueryCursorImpl<List<?>> cur;
 
         // Do a two-step query only if locality flag is not set AND if plan's SELECT corresponds to an actual
         // sub-query and not some dummy stuff like "select 1, 2, 3;"
-        if (!loc && !plan.isLocSubqry) {
-            SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQry, fieldsQry.isCollocated())
+        if (!loc && !plan.isLocalSubquery()) {
+            SqlFieldsQuery newFieldsQry = new SqlFieldsQuery(plan.selectQuery(), fieldsQry.isCollocated())
                 .setArgs(fieldsQry.getArgs())
                 .setDistributedJoins(fieldsQry.isDistributedJoins())
                 .setEnforceJoinOrder(fieldsQry.isEnforceJoinOrder())
@@ -400,7 +378,7 @@ public class DmlStatementsProcessor {
                 cancel, mainCacheId, true).get(0);
         }
         else {
-            final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQry,
+            final GridQueryFieldsResult res = idx.queryLocalSqlFields(schemaName, plan.selectQuery(),
                 F.asList(fieldsQry.getArgs()), filters, fieldsQry.isEnforceJoinOrder(), fieldsQry.getTimeout(), cancel);
 
             cur = new QueryCursorImpl<>(new Iterable<List<?>>() {
@@ -430,7 +408,7 @@ public class DmlStatementsProcessor {
      */
     private UpdateResult processDmlSelectResult(GridCacheContext cctx, UpdatePlan plan, Iterable<List<?>> cursor,
         int pageSize) throws IgniteCheckedException {
-        switch (plan.mode) {
+        switch (plan.mode()) {
             case MERGE:
                 return new UpdateResult(doMerge(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
 
@@ -444,7 +422,7 @@ public class DmlStatementsProcessor {
                 return doDelete(cctx, cursor, pageSize);
 
             default:
-                throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode + ']',
+                throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode() + ']',
                     IgniteQueryErrorCode.UNEXPECTED_OPERATION);
         }
     }
@@ -480,46 +458,6 @@ public class DmlStatementsProcessor {
     }
 
     /**
-     * Perform single cache operation based on given args.
-     * @param plan Update plan.
-     * @param args Query parameters.
-     * @return 1 if an item was affected, 0 otherwise.
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings({"unchecked", "ConstantConditions"})
-    private static UpdateResult doFastUpdate(UpdatePlan plan, Object[] args) throws IgniteCheckedException {
-        GridCacheContext cctx = plan.tbl.rowDescriptor().context();
-
-        FastUpdateArguments singleUpdate = plan.fastUpdateArgs;
-
-        assert singleUpdate != null;
-
-        boolean valBounded = (singleUpdate.val != FastUpdateArguments.NULL_ARGUMENT);
-
-        if (singleUpdate.newVal != FastUpdateArguments.NULL_ARGUMENT) { // Single item UPDATE
-            Object key = singleUpdate.key.apply(args);
-            Object newVal = singleUpdate.newVal.apply(args);
-
-            if (valBounded) {
-                Object val = singleUpdate.val.apply(args);
-
-                return (cctx.cache().replace(key, val, newVal) ? UpdateResult.ONE : UpdateResult.ZERO);
-            }
-            else
-                return (cctx.cache().replace(key, newVal) ? UpdateResult.ONE : UpdateResult.ZERO);
-        }
-        else { // Single item DELETE
-            Object key = singleUpdate.key.apply(args);
-            Object val = singleUpdate.val.apply(args);
-
-            if (singleUpdate.val == FastUpdateArguments.NULL_ARGUMENT) // No _val bound in source query
-                return cctx.cache().remove(key) ? UpdateResult.ONE : UpdateResult.ZERO;
-            else
-                return cctx.cache().remove(key, val) ? UpdateResult.ONE : UpdateResult.ZERO;
-        }
-    }
-
-    /**
      * @param schemaName Schema name.
      * @param fieldsQry Initial query.
      * @param plan Update plan.
@@ -529,13 +467,15 @@ public class DmlStatementsProcessor {
      */
     private UpdateResult doDistributedUpdate(String schemaName, SqlFieldsQuery fieldsQry, UpdatePlan plan,
         GridQueryCancel cancel) throws IgniteCheckedException {
-        assert plan.distributed != null;
+        DmlDistributedPlanInfo distributedPlan = plan.distributedPlan();
+
+        assert distributedPlan != null;
 
         if (cancel == null)
             cancel = new GridQueryCancel();
 
-        return idx.runDistributedUpdate(schemaName, fieldsQry, plan.distributed.getCacheIds(),
-            plan.distributed.isReplicatedOnly(), cancel);
+        return idx.runDistributedUpdate(schemaName, fieldsQry, distributedPlan.getCacheIds(),
+            distributedPlan.isReplicatedOnly(), cancel);
     }
 
     /**
@@ -548,7 +488,7 @@ public class DmlStatementsProcessor {
     @SuppressWarnings({"unchecked", "ConstantConditions", "ThrowableResultOfMethodCallIgnored"})
     private UpdateResult doDelete(GridCacheContext cctx, Iterable<List<?>> cursor, int pageSize)
         throws IgniteCheckedException {
-        BatchSender sender = new BatchSender(cctx, pageSize);
+        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize);
 
         for (List<?> row : cursor) {
             if (row.size() != 2) {
@@ -594,84 +534,18 @@ public class DmlStatementsProcessor {
     @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
     private UpdateResult doUpdate(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize)
         throws IgniteCheckedException {
-        GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
-
-        GridCacheContext cctx = desc.context();
-
-        boolean bin = cctx.binaryMarshaller();
-
-        String[] updatedColNames = plan.colNames;
-
-        int valColIdx = plan.valColIdx;
+        GridCacheContext cctx = plan.cacheContext();
 
-        boolean hasNewVal = (valColIdx != -1);
-
-        // Statement updates distinct properties if it does not have _val in updated columns list
-        // or if its list of updated columns includes only _val, i.e. is single element.
-        boolean hasProps = !hasNewVal || updatedColNames.length > 1;
-
-        BatchSender sender = new BatchSender(cctx, pageSize);
+        DmlBatchSender sender = new DmlBatchSender(cctx, pageSize);
 
         for (List<?> row : cursor) {
-            Object key = row.get(0);
-
-            Object newVal;
-
-            Map<String, Object> newColVals = new HashMap<>();
-
-            for (int i = 0; i < plan.colNames.length; i++) {
-                if (hasNewVal && i == valColIdx - 2)
-                    continue;
-
-                GridQueryProperty prop = plan.tbl.rowDescriptor().type().property(plan.colNames[i]);
-
-                assert prop != null : "Unknown property: " + plan.colNames[i];
-
-                newColVals.put(plan.colNames[i], convert(row.get(i + 2), desc, prop.type(), plan.colTypes[i]));
-            }
-
-            newVal = plan.valSupplier.apply(row);
-
-            if (newVal == null)
-                throw new IgniteSQLException("New value for UPDATE must not be null", IgniteQueryErrorCode.NULL_VALUE);
-
-            // Skip key and value - that's why we start off with 3rd column
-            for (int i = 0; i < plan.tbl.getColumns().length - DEFAULT_COLUMNS_COUNT; i++) {
-                Column c = plan.tbl.getColumn(i + DEFAULT_COLUMNS_COUNT);
-
-                if (desc.isKeyValueOrVersionColumn(c.getColumnId()))
-                    continue;
-
-                GridQueryProperty prop = desc.type().property(c.getName());
-
-                if (prop.key())
-                    continue; // Don't get values of key's columns - we won't use them anyway
-
-                boolean hasNewColVal = newColVals.containsKey(c.getName());
-
-                if (!hasNewColVal)
-                    continue;
-
-                Object colVal = newColVals.get(c.getName());
-
-                // UPDATE currently does not allow to modify key or its fields, so we must be safe to pass null as key.
-                desc.setColumnValue(null, newVal, colVal, i);
-            }
-
-            if (bin && hasProps) {
-                assert newVal instanceof BinaryObjectBuilder;
-
-                newVal = ((BinaryObjectBuilder) newVal).build();
-            }
-
-            desc.type().validateKeyAndValue(key, newVal);
-
-            Object srcVal = row.get(1);
+            T3<Object, Object, Object> row0 = plan.processRowForUpdate(row);
 
-            if (bin && !(srcVal instanceof BinaryObject))
-                srcVal = cctx.grid().binary().toBinary(srcVal);
+            Object key = row0.get1();
+            Object oldVal = row0.get2();
+            Object newVal = row0.get3();
 
-            sender.add(key, new ModifyingEntryProcessor(srcVal, new EntryValueUpdater(newVal)));
+            sender.add(key, new ModifyingEntryProcessor(oldVal, new EntryValueUpdater(newVal)));
         }
 
         sender.flush();
@@ -699,120 +573,6 @@ public class DmlStatementsProcessor {
     }
 
     /**
-     * Convert value to column's expected type by means of H2.
-     *
-     * @param val Source value.
-     * @param desc Row descriptor.
-     * @param expCls Expected value class.
-     * @param type Expected column type to convert to.
-     * @return Converted object.
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings({"ConstantConditions", "SuspiciousSystemArraycopy"})
-    private static Object convert(Object val, GridH2RowDescriptor desc, Class<?> expCls, int type)
-        throws IgniteCheckedException {
-        if (val == null)
-            return null;
-
-        Class<?> currCls = val.getClass();
-
-        try {
-            if (val instanceof Date && currCls != Date.class && expCls == Date.class) {
-                // H2 thinks that java.util.Date is always a Timestamp, while binary marshaller expects
-                // precise Date instance. Let's satisfy it.
-                return new Date(((Date) val).getTime());
-            }
-
-            // User-given UUID is always serialized by H2 to byte array, so we have to deserialize manually
-            if (type == Value.UUID && currCls == byte[].class)
-                return U.unmarshal(desc.context().marshaller(), (byte[]) val,
-                    U.resolveClassLoader(desc.context().gridConfig()));
-
-            if (LocalDateTimeUtils.isJava8DateApiPresent()) {
-                if (val instanceof Timestamp && LocalDateTimeUtils.isLocalDateTime(expCls))
-                    return LocalDateTimeUtils.valueToLocalDateTime(ValueTimestamp.get((Timestamp) val));
-
-                if (val instanceof Date && LocalDateTimeUtils.isLocalDate(expCls))
-                    return LocalDateTimeUtils.valueToLocalDate(ValueDate.fromDateValue(
-                        DateTimeUtils.dateValueFromDate(((Date) val).getTime())));
-
-                if (val instanceof Time && LocalDateTimeUtils.isLocalTime(expCls))
-                    return LocalDateTimeUtils.valueToLocalTime(ValueTime.get((Time) val));
-            }
-
-            // We have to convert arrays of reference types manually -
-            // see https://issues.apache.org/jira/browse/IGNITE-4327
-            // Still, we only can convert from Object[] to something more precise.
-            if (type == Value.ARRAY && currCls != expCls) {
-                if (currCls != Object[].class)
-                    throw new IgniteCheckedException("Unexpected array type - only conversion from Object[] " +
-                        "is assumed");
-
-                // Why would otherwise type be Value.ARRAY?
-                assert expCls.isArray();
-
-                Object[] curr = (Object[]) val;
-
-                Object newArr = Array.newInstance(expCls.getComponentType(), curr.length);
-
-                System.arraycopy(curr, 0, newArr, 0, curr.length);
-
-                return newArr;
-            }
-
-            return H2Utils.convert(val, desc, type);
-        }
-        catch (Exception e) {
-            throw new IgniteSQLException("Value conversion failed [from=" + currCls.getName() + ", to=" +
-                expCls.getName() +']', IgniteQueryErrorCode.CONVERSION_FAILED, e);
-        }
-    }
-
-    /**
-     * Process errors of entry processor - split the keys into duplicated/concurrently modified and those whose
-     * processing yielded an exception.
-     *
-     * @param res Result of {@link GridCacheAdapter#invokeAll)}
-     * @return pair [array of duplicated/concurrently modified keys, SQL exception for erroneous keys] (exception is
-     * null if all keys are duplicates/concurrently modified ones).
-     */
-    private static PageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res) {
-        Set<Object> errKeys = new LinkedHashSet<>(res.keySet());
-
-        SQLException currSqlEx = null;
-
-        SQLException firstSqlEx = null;
-
-        int errors = 0;
-
-        // Let's form a chain of SQL exceptions
-        for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : res.entrySet()) {
-            try {
-                e.getValue().get();
-            }
-            catch (EntryProcessorException ex) {
-                SQLException next = createJdbcSqlException("Failed to process key '" + e.getKey() + '\'',
-                    IgniteQueryErrorCode.ENTRY_PROCESSING);
-
-                next.initCause(ex);
-
-                if (currSqlEx != null)
-                    currSqlEx.setNextException(next);
-                else
-                    firstSqlEx = next;
-
-                currSqlEx = next;
-
-                errKeys.remove(e.getKey());
-
-                errors++;
-            }
-        }
-
-        return new PageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors);
-    }
-
-    /**
      * Execute MERGE statement plan.
      * @param cursor Cursor to take inserted data from.
      * @param pageSize Batch size to stream data from {@code cursor}, anything <= 0 for single page operations.
@@ -821,13 +581,11 @@ public class DmlStatementsProcessor {
      */
     @SuppressWarnings("unchecked")
     private long doMerge(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
-        GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
-
-        GridCacheContext cctx = desc.context();
+        GridCacheContext cctx = plan.cacheContext();
 
         // If we have just one item to put, just do so
-        if (plan.rowsNum == 1) {
-            IgniteBiTuple t = rowToKeyValue(cctx, cursor.iterator().next(), plan);
+        if (plan.rowCount() == 1) {
+            IgniteBiTuple t = plan.processRow(cursor.iterator().next());
 
             cctx.cache().put(t.getKey(), t.getValue());
 
@@ -841,7 +599,7 @@ public class DmlStatementsProcessor {
             for (Iterator<List<?>> it = cursor.iterator(); it.hasNext();) {
                 List<?> row = it.next();
 
-                IgniteBiTuple t = rowToKeyValue(cctx, row, plan);
+                IgniteBiTuple t = plan.processRow(row);
 
                 rows.put(t.getKey(), t.getValue());
 
@@ -868,13 +626,11 @@ public class DmlStatementsProcessor {
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
     private long doInsert(UpdatePlan plan, Iterable<List<?>> cursor, int pageSize) throws IgniteCheckedException {
-        GridH2RowDescriptor desc = plan.tbl.rowDescriptor();
-
-        GridCacheContext cctx = desc.context();
+        GridCacheContext cctx = plan.cacheContext();
 
         // If we have just one item to put, just do so
-        if (plan.rowsNum == 1) {
-            IgniteBiTuple t = rowToKeyValue(cctx, cursor.iterator().next(), plan);
+        if (plan.rowCount() == 1) {
+            IgniteBiTuple t = plan.processRow(cursor.iterator().next());
 
             if (cctx.cache().putIfAbsent(t.getKey(), t.getValue()))
                 return 1;
@@ -884,10 +640,10 @@ public class DmlStatementsProcessor {
         }
         else {
             // Keys that failed to INSERT due to duplication.
-            BatchSender sender = new BatchSender(cctx, pageSize);
+            DmlBatchSender sender = new DmlBatchSender(cctx, pageSize);
 
             for (List<?> row : cursor) {
-                final IgniteBiTuple keyValPair = rowToKeyValue(cctx, row, plan);
+                final IgniteBiTuple keyValPair = plan.processRow(row);
 
                 sender.add(keyValPair.getKey(), new InsertEntryProcessor(keyValPair.getValue()));
             }
@@ -916,124 +672,6 @@ public class DmlStatementsProcessor {
     }
 
     /**
-     * Execute given entry processors and collect errors, if any.
-     * @param cctx Cache context.
-     * @param rows Rows to process.
-     * @return Triple [number of rows actually changed; keys that failed to update (duplicates or concurrently
-     *     updated ones); chain of exceptions for all keys whose processing resulted in error, or null for no errors].
-     * @throws IgniteCheckedException If failed.
-     */
-    @SuppressWarnings({"unchecked", "ConstantConditions"})
-    private static PageProcessingResult processPage(GridCacheContext cctx,
-        Map<Object, EntryProcessor<Object, Object, Boolean>> rows) throws IgniteCheckedException {
-        Map<Object, EntryProcessorResult<Boolean>> res = cctx.cache().invokeAll(rows);
-
-        if (F.isEmpty(res))
-            return new PageProcessingResult(rows.size(), null, null);
-
-        PageProcessingErrorResult splitRes = splitErrors(res);
-
-        int keysCnt = splitRes.errKeys.length;
-
-        return new PageProcessingResult(rows.size() - keysCnt - splitRes.cnt, splitRes.errKeys, splitRes.ex);
-    }
-
-    /**
-     * Convert row presented as an array of Objects into key-value pair to be inserted to cache.
-     * @param cctx Cache context.
-     * @param row Row to process.
-     * @param plan Update plan.
-     * @throws IgniteCheckedException if failed.
-     */
-    @SuppressWarnings({"unchecked", "ConstantConditions", "ResultOfMethodCallIgnored"})
-    private IgniteBiTuple<?, ?> rowToKeyValue(GridCacheContext cctx, List<?> row, UpdatePlan plan)
-        throws IgniteCheckedException {
-        GridH2RowDescriptor rowDesc = plan.tbl.rowDescriptor();
-        GridQueryTypeDescriptor desc = rowDesc.type();
-
-        Object key = plan.keySupplier.apply(row);
-
-        if (QueryUtils.isSqlType(desc.keyClass())) {
-            assert plan.keyColIdx != -1;
-
-            key = convert(key, rowDesc, desc.keyClass(), plan.colTypes[plan.keyColIdx]);
-        }
-
-        Object val = plan.valSupplier.apply(row);
-
-        if (QueryUtils.isSqlType(desc.valueClass())) {
-            assert plan.valColIdx != -1;
-
-            val = convert(val, rowDesc, desc.valueClass(), plan.colTypes[plan.valColIdx]);
-        }
-
-        if (key == null) {
-            if (F.isEmpty(desc.keyFieldName()))
-                throw new IgniteSQLException("Key for INSERT or MERGE must not be null", IgniteQueryErrorCode.NULL_KEY);
-            else
-                throw new IgniteSQLException("Null value is not allowed for column '" + desc.keyFieldName() + "'",
-                    IgniteQueryErrorCode.NULL_KEY);
-        }
-
-        if (val == null) {
-            if (F.isEmpty(desc.valueFieldName()))
-                throw new IgniteSQLException("Value for INSERT, MERGE, or UPDATE must not be null",
-                    IgniteQueryErrorCode.NULL_VALUE);
-            else
-                throw new IgniteSQLException("Null value is not allowed for column '" + desc.valueFieldName() + "'",
-                    IgniteQueryErrorCode.NULL_VALUE);
-        }
-
-        Map<String, Object> newColVals = new HashMap<>();
-
-        for (int i = 0; i < plan.colNames.length; i++) {
-            if (i == plan.keyColIdx || i == plan.valColIdx)
-                continue;
-
-            String colName = plan.colNames[i];
-
-            GridQueryProperty prop = desc.property(colName);
-
-            assert prop != null;
-
-            Class<?> expCls = prop.type();
-
-            newColVals.put(colName, convert(row.get(i), rowDesc, expCls, plan.colTypes[i]));
-        }
-
-        // We update columns in the order specified by the table for a reason - table's
-        // column order preserves their precedence for correct update of nested properties.
-        Column[] cols = plan.tbl.getColumns();
-
-        // First 3 columns are _key, _val and _ver. Skip 'em.
-        for (int i = DEFAULT_COLUMNS_COUNT; i < cols.length; i++) {
-            if (plan.tbl.rowDescriptor().isKeyValueOrVersionColumn(i))
-                continue;
-
-            String colName = cols[i].getName();
-
-            if (!newColVals.containsKey(colName))
-                continue;
-
-            Object colVal = newColVals.get(colName);
-
-            desc.setValue(colName, key, val, colVal);
-        }
-
-        if (cctx.binaryMarshaller()) {
-            if (key instanceof BinaryObjectBuilder)
-                key = ((BinaryObjectBuilder) key).build();
-
-            if (val instanceof BinaryObjectBuilder)
-                val = ((BinaryObjectBuilder) val).build();
-        }
-
-        desc.validateKeyAndValue(key, val);
-
-        return new IgniteBiTuple<>(key, val);
-    }
-
-    /**
      *
      * @param schemaName Schema name.
      * @param stmt Prepared statement.
@@ -1164,7 +802,7 @@ public class DmlStatementsProcessor {
     static void checkUpdateResult(UpdateResult r) {
         if (!F.isEmpty(r.errorKeys())) {
             String msg = "Failed to update some keys because they had been modified concurrently " +
-                "[keys=" + r.errorKeys() + ']';
+                "[keys=" + Arrays.toString(r.errorKeys()) + ']';
 
             SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
 
@@ -1172,175 +810,4 @@ public class DmlStatementsProcessor {
         }
     }
 
-    /** Result of processing an individual page with {@link IgniteCache#invokeAll} including error details, if any. */
-    private final static class PageProcessingResult {
-        /** Number of successfully processed items. */
-        final long cnt;
-
-        /** Keys that failed to be updated or deleted due to concurrent modification of values. */
-        @NotNull
-        final Object[] errKeys;
-
-        /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */
-        final SQLException ex;
-
-        /** */
-        @SuppressWarnings("ConstantConditions")
-        private PageProcessingResult(long cnt, Object[] errKeys, SQLException ex) {
-            this.cnt = cnt;
-            this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
-            this.ex = ex;
-        }
-    }
-
-    /** Result of splitting keys whose processing resulted into an exception from those skipped by
-     * logic of {@link EntryProcessor}s (most likely INSERT duplicates, or UPDATE/DELETE keys whose values
-     * had been modified concurrently), counting and collecting entry processor exceptions.
-     */
-    private final static class PageProcessingErrorResult {
-        /** Keys that failed to be processed by {@link EntryProcessor} (not due to an exception). */
-        @NotNull
-        final Object[] errKeys;
-
-        /** Number of entries whose processing resulted into an exception. */
-        final int cnt;
-
-        /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */
-        final SQLException ex;
-
-        /** */
-        @SuppressWarnings("ConstantConditions")
-        private PageProcessingErrorResult(@NotNull Object[] errKeys, SQLException ex, int exCnt) {
-            errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
-            // When exceptions count must be zero, exceptions chain must be not null, and vice versa.
-            assert exCnt == 0 ^ ex != null;
-
-            this.errKeys = errKeys;
-            this.cnt = exCnt;
-            this.ex = ex;
-        }
-    }
-
-    /**
-     * Batch sender class.
-     */
-    private static class BatchSender {
-        /** Cache context. */
-        private final GridCacheContext cctx;
-
-        /** Batch size. */
-        private final int size;
-
-        /** Batches. */
-        private final Map<UUID, Map<Object, EntryProcessor<Object, Object, Boolean>>> batches = new HashMap<>();
-
-        /** Result count. */
-        private long updateCnt;
-
-        /** Failed keys. */
-        private List<Object> failedKeys;
-
-        /** Exception. */
-        private SQLException err;
-
-        /**
-         * Constructor.
-         *
-         * @param cctx Cache context.
-         * @param size Batch.
-         */
-        public BatchSender(GridCacheContext cctx, int size) {
-            this.cctx = cctx;
-            this.size = size;
-        }
-
-        /**
-         * Add entry to batch.
-         *
-         * @param key Key.
-         * @param proc Processor.
-         */
-        public void add(Object key, EntryProcessor<Object, Object, Boolean> proc) throws IgniteCheckedException {
-            ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE);
-
-            if (node == null)
-                throw new IgniteCheckedException("Failed to map key to node.");
-
-            UUID nodeId = node.id();
-
-            Map<Object, EntryProcessor<Object, Object, Boolean>> batch = batches.get(nodeId);
-
-            if (batch == null) {
-                batch = new HashMap<>();
-
-                batches.put(nodeId, batch);
-            }
-
-            batch.put(key, proc);
-
-            if (batch.size() >= size) {
-                sendBatch(batch);
-
-                batch.clear();
-            }
-        }
-
-        /**
-         * Flush any remaining entries.
-         *
-         * @throws IgniteCheckedException If failed.
-         */
-        public void flush() throws IgniteCheckedException {
-            for (Map<Object, EntryProcessor<Object, Object, Boolean>> batch : batches.values()) {
-                if (!batch.isEmpty())
-                    sendBatch(batch);
-            }
-        }
-
-        /**
-         * @return Update count.
-         */
-        public long updateCount() {
-            return updateCnt;
-        }
-
-        /**
-         * @return Failed keys.
-         */
-        public List<Object> failedKeys() {
-            return failedKeys != null ? failedKeys : Collections.emptyList();
-        }
-
-        /**
-         * @return Error.
-         */
-        public SQLException error() {
-            return err;
-        }
-
-        /**
-         * Send the batch.
-         *
-         * @param batch Batch.
-         * @throws IgniteCheckedException If failed.
-         */
-        private void sendBatch(Map<Object, EntryProcessor<Object, Object, Boolean>> batch)
-            throws IgniteCheckedException {
-            PageProcessingResult pageRes = processPage(cctx, batch);
-
-            updateCnt += pageRes.cnt;
-
-            if (failedKeys == null)
-                failedKeys = new ArrayList<>();
-
-            failedKeys.addAll(F.asList(pageRes.errKeys));
-
-            if (pageRes.ex != null) {
-                if (err == null)
-                    err = pageRes.ex;
-                else
-                    err.setNextException(pageRes.ex);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
index de0e63f..32381ba 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
@@ -25,10 +25,10 @@ import org.apache.ignite.internal.util.typedef.internal.U;
  */
 public final class UpdateResult {
     /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */
-    final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY);
+    public static final UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY);
 
     /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */
-    final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
+    public static final UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
 
     /** Number of processed items. */
     private final long cnt;

http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
new file mode 100644
index 0000000..054e708
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlAstUtils.java
@@ -0,0 +1,609 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2KeyValueRowOnheap;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAlias;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlArray;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlAst;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlColumn;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlConst;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlDelete;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlElement;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunction;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlFunctionType;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlJoin;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlKeyword;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperation;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlOperationType;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlParameter;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSubquery;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUpdate;
+import org.apache.ignite.internal.util.lang.IgnitePair;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.h2.command.Parser;
+import org.h2.expression.Expression;
+import org.h2.table.Column;
+import org.h2.table.Table;
+import org.h2.util.IntArray;
+import org.h2.value.DataType;
+import org.h2.value.Value;
+import org.h2.value.ValueDate;
+import org.h2.value.ValueInt;
+import org.h2.value.ValueString;
+import org.h2.value.ValueTime;
+import org.h2.value.ValueTimestamp;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * AST utils for DML
+ */
+public final class DmlAstUtils {
+    /**
+     * Empty ctor to prevent initialization.
+     */
+    private DmlAstUtils() {
+        // No-op.
+    }
+
+    /**
+     * Create SELECT on which subsequent INSERT or MERGE will be based.
+     *
+     * @param cols Columns to insert values into.
+     * @param rows Rows to create pseudo-SELECT upon.
+     * @param subQry Subquery to use rather than rows.
+     * @return Subquery or pseudo-SELECT to evaluate inserted expressions.
+     */
+    public static GridSqlQuery selectForInsertOrMerge(GridSqlColumn[] cols, List<GridSqlElement[]> rows,
+        GridSqlQuery subQry) {
+        if (!F.isEmpty(rows)) {
+            assert !F.isEmpty(cols);
+
+            GridSqlSelect sel = new GridSqlSelect();
+
+            GridSqlFunction from = new GridSqlFunction(GridSqlFunctionType.TABLE);
+
+            sel.from(from);
+
+            GridSqlArray[] args = new GridSqlArray[cols.length];
+
+            for (int i = 0; i < cols.length; i++) {
+                GridSqlArray arr = new GridSqlArray(rows.size());
+
+                String colName = cols[i].columnName();
+
+                GridSqlAlias alias = new GridSqlAlias(colName, arr);
+
+                alias.resultType(cols[i].resultType());
+
+                from.addChild(alias);
+
+                args[i] = arr;
+
+                GridSqlColumn newCol = new GridSqlColumn(null, from, null,"TABLE", colName);
+
+                newCol.resultType(cols[i].resultType());
+
+                sel.addColumn(newCol, true);
+            }
+
+            for (GridSqlElement[] row : rows) {
+                assert cols.length == row.length;
+
+                for (int i = 0; i < row.length; i++)
+                    args[i].addChild(row[i]);
+            }
+
+            return sel;
+        }
+        else {
+            assert subQry != null;
+
+            return subQry;
+        }
+    }
+
+    /**
+     * Generate SQL SELECT based on DELETE's WHERE, LIMIT, etc.
+     *
+     * @param del Delete statement.
+     * @param keysParamIdx Index for .
+     * @return SELECT statement.
+     */
+    public static GridSqlSelect selectForDelete(GridSqlDelete del, @Nullable Integer keysParamIdx) {
+        GridSqlSelect mapQry = new GridSqlSelect();
+
+        mapQry.from(del.from());
+
+        Set<GridSqlTable> tbls = new HashSet<>();
+
+        collectAllGridTablesInTarget(del.from(), tbls);
+
+        assert tbls.size() == 1 : "Failed to determine target table for DELETE";
+
+        GridSqlTable tbl = tbls.iterator().next();
+
+        GridH2Table gridTbl = tbl.dataTable();
+
+        assert gridTbl != null : "Failed to determine target grid table for DELETE";
+
+        Column h2KeyCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.KEY_COL);
+
+        Column h2ValCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.VAL_COL);
+
+        GridSqlColumn keyCol = new GridSqlColumn(h2KeyCol, tbl, h2KeyCol.getName());
+        keyCol.resultType(GridSqlType.fromColumn(h2KeyCol));
+
+        GridSqlColumn valCol = new GridSqlColumn(h2ValCol, tbl, h2ValCol.getName());
+        valCol.resultType(GridSqlType.fromColumn(h2ValCol));
+
+        mapQry.addColumn(keyCol, true);
+        mapQry.addColumn(valCol, true);
+
+        GridSqlElement where = del.where();
+        if (keysParamIdx != null)
+            where = injectKeysFilterParam(where, keyCol, keysParamIdx);
+
+        mapQry.where(where);
+        mapQry.limit(del.limit());
+
+        return mapQry;
+    }
+
+    /**
+     * @param update UPDATE statement.
+     * @return {@code null} if given statement directly updates {@code _val} column with a literal or param value
+     * and filters by single non expression key (and, optionally,  by single non expression value).
+     */
+    public static FastUpdate getFastUpdateArgs(GridSqlUpdate update) {
+        IgnitePair<GridSqlElement> filter = findKeyValueEqualityCondition(update.where());
+
+        if (filter == null)
+            return null;
+
+        if (update.cols().size() != 1)
+            return null;
+
+        Table tbl = update.cols().get(0).column().getTable();
+        if (!(tbl instanceof GridH2Table))
+            return null;
+
+        GridH2RowDescriptor desc = ((GridH2Table)tbl).rowDescriptor();
+        if (!desc.isValueColumn(update.cols().get(0).column().getColumnId()))
+            return null;
+
+        GridSqlElement set = update.set().get(update.cols().get(0).columnName());
+
+        if (!(set instanceof GridSqlConst || set instanceof GridSqlParameter))
+            return null;
+
+        return FastUpdate.create(filter.getKey(), filter.getValue(), set);
+    }
+
+    /**
+     * @param del DELETE statement.
+     * @return {@code true} if given statement filters by single non expression key.
+     */
+    public static FastUpdate getFastDeleteArgs(GridSqlDelete del) {
+        IgnitePair<GridSqlElement> filter = findKeyValueEqualityCondition(del.where());
+
+        if (filter == null)
+            return null;
+
+        return FastUpdate.create(filter.getKey(), filter.getValue(), null);
+    }
+
+    /**
+     * @param where Element to test.
+     * @return Whether given element corresponds to {@code WHERE _key = ?}, and key is a literal expressed
+     * in query or a query param.
+     */
+    @SuppressWarnings("RedundantCast")
+    private static IgnitePair<GridSqlElement> findKeyValueEqualityCondition(GridSqlElement where) {
+        if (where == null || !(where instanceof GridSqlOperation))
+            return null;
+
+        GridSqlOperation whereOp = (GridSqlOperation) where;
+
+        // Does this WHERE limit only by _key?
+        if (isKeyEqualityCondition(whereOp))
+            return new IgnitePair<>((GridSqlElement)whereOp.child(1), null);
+
+        // Or maybe it limits both by _key and _val?
+        if (whereOp.operationType() != GridSqlOperationType.AND)
+            return null;
+
+        GridSqlElement left = whereOp.child(0);
+
+        GridSqlElement right = whereOp.child(1);
+
+        if (!(left instanceof GridSqlOperation && right instanceof GridSqlOperation))
+            return null;
+
+        GridSqlOperation leftOp = (GridSqlOperation) left;
+
+        GridSqlOperation rightOp = (GridSqlOperation) right;
+
+        if (isKeyEqualityCondition(leftOp)) { // _key = ? and _val = ?
+            if (!isValueEqualityCondition(rightOp))
+                return null;
+
+            return new IgnitePair<>((GridSqlElement)leftOp.child(1), (GridSqlElement)rightOp.child(1));
+        }
+        else if (isKeyEqualityCondition(rightOp)) { // _val = ? and _key = ?
+            if (!isValueEqualityCondition(leftOp))
+                return null;
+
+            return new IgnitePair<>((GridSqlElement)rightOp.child(1), (GridSqlElement)leftOp.child(1));
+        }
+        else // Neither
+            return null;
+    }
+
+    /**
+     * @param op Operation.
+     * @param key true - check for key equality condition,
+     *            otherwise check for value equality condition
+     * @return Whether this condition is of form {@code colName} = ?
+     */
+    private static boolean isEqualityCondition(GridSqlOperation op, boolean key) {
+        if (op.operationType() != GridSqlOperationType.EQUAL)
+            return false;
+
+        GridSqlElement left = op.child(0);
+        GridSqlElement right = op.child(1);
+
+        if (!(left instanceof GridSqlColumn))
+            return false;
+
+        GridSqlColumn column = (GridSqlColumn)left;
+        if (!(column.column().getTable() instanceof GridH2Table))
+            return false;
+
+        GridH2RowDescriptor desc =((GridH2Table) column.column().getTable()).rowDescriptor();
+
+        return  (key ? desc.isKeyColumn(column.column().getColumnId()) :
+                       desc.isValueColumn(column.column().getColumnId())) &&
+                (right instanceof GridSqlConst || right instanceof GridSqlParameter);
+    }
+
+    /**
+     * @param op Operation.
+     * @return Whether this condition is of form _key = ?
+     */
+    private static boolean isKeyEqualityCondition(GridSqlOperation op) {
+        return isEqualityCondition(op, true);
+    }
+
+    /**
+     * @param op Operation.
+     * @return Whether this condition is of form _val = ?
+     */
+    private static boolean isValueEqualityCondition(GridSqlOperation op) {
+        return isEqualityCondition(op, false);
+    }
+
+
+    /**
+     * Generate SQL SELECT based on UPDATE's WHERE, LIMIT, etc.
+     *
+     * @param update Update statement.
+     * @param keysParamIdx Index of new param for the array of keys.
+     * @return SELECT statement.
+     */
+    public static GridSqlSelect selectForUpdate(GridSqlUpdate update, @Nullable Integer keysParamIdx) {
+        GridSqlSelect mapQry = new GridSqlSelect();
+
+        mapQry.from(update.target());
+
+        Set<GridSqlTable> tbls = new HashSet<>();
+
+        collectAllGridTablesInTarget(update.target(), tbls);
+
+        assert tbls.size() == 1 : "Failed to determine target table for UPDATE";
+
+        GridSqlTable tbl = tbls.iterator().next();
+
+        GridH2Table gridTbl = tbl.dataTable();
+
+        assert gridTbl != null : "Failed to determine target grid table for UPDATE";
+
+        Column h2KeyCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.KEY_COL);
+
+        Column h2ValCol = gridTbl.getColumn(GridH2KeyValueRowOnheap.VAL_COL);
+
+        GridSqlColumn keyCol = new GridSqlColumn(h2KeyCol, tbl, h2KeyCol.getName());
+        keyCol.resultType(GridSqlType.fromColumn(h2KeyCol));
+
+        GridSqlColumn valCol = new GridSqlColumn(h2ValCol, tbl, h2ValCol.getName());
+        valCol.resultType(GridSqlType.fromColumn(h2ValCol));
+
+        mapQry.addColumn(keyCol, true);
+        mapQry.addColumn(valCol, true);
+
+        for (GridSqlColumn c : update.cols()) {
+            String newColName = Parser.quoteIdentifier("_upd_" + c.columnName());
+            // We have to use aliases to cover cases when the user
+            // wants to update _val field directly (if it's a literal)
+            GridSqlAlias alias = new GridSqlAlias(newColName, elementOrDefault(update.set().get(c.columnName()), c), true);
+            alias.resultType(c.resultType());
+            mapQry.addColumn(alias, true);
+        }
+
+        GridSqlElement where = update.where();
+        if (keysParamIdx != null)
+            where = injectKeysFilterParam(where, keyCol, keysParamIdx);
+
+        mapQry.where(where);
+        mapQry.limit(update.limit());
+
+        return mapQry;
+    }
+
+    /**
+     * Do what we can to compute default value for this column (mimics H2 behavior).
+     * @see Table#getDefaultValue
+     * @see Column#validateConvertUpdateSequence
+     * @param el SQL element.
+     * @param col Column.
+     * @return {@link GridSqlConst#NULL}, if {@code el} is null, or {@code el} if
+     * it's not {@link GridSqlKeyword#DEFAULT}, or computed default value.
+     */
+    private static GridSqlElement elementOrDefault(GridSqlElement el, GridSqlColumn col) {
+        if (el == null)
+            return GridSqlConst.NULL;
+
+        if (el != GridSqlKeyword.DEFAULT)
+            return el;
+
+        Column h2Col = col.column();
+
+        Expression dfltExpr = h2Col.getDefaultExpression();
+
+        Value dfltVal;
+
+        try {
+            dfltVal = dfltExpr != null ? dfltExpr.getValue(null) : null;
+        }
+        catch (Exception ignored) {
+            throw new IgniteSQLException("Failed to evaluate default value for a column " + col.columnName());
+        }
+
+        if (dfltVal != null)
+            return new GridSqlConst(dfltVal);
+
+        int type = h2Col.getType();
+
+        DataType dt = DataType.getDataType(type);
+
+        if (dt.decimal)
+            dfltVal = ValueInt.get(0).convertTo(type);
+        else if (dt.type == Value.TIMESTAMP)
+            dfltVal = ValueTimestamp.fromMillis(U.currentTimeMillis());
+        else if (dt.type == Value.TIME)
+            dfltVal = ValueTime.fromNanos(0);
+        else if (dt.type == Value.DATE)
+            dfltVal = ValueDate.fromMillis(U.currentTimeMillis());
+        else
+            dfltVal = ValueString.get("").convertTo(type);
+
+        return new GridSqlConst(dfltVal);
+    }
+
+    /**
+     * Append additional condition to WHERE for it to select only specific keys.
+     *
+     * @param where Initial condition.
+     * @param keyCol Column to base the new condition on.
+     * @return New condition.
+     */
+    private static GridSqlElement injectKeysFilterParam(GridSqlElement where, GridSqlColumn keyCol, int paramIdx) {
+        // Yes, we need a subquery for "WHERE _key IN ?" to work with param being an array without dirty query rewriting.
+        GridSqlSelect sel = new GridSqlSelect();
+
+        GridSqlFunction from = new GridSqlFunction(GridSqlFunctionType.TABLE);
+
+        sel.from(from);
+
+        GridSqlColumn col = new GridSqlColumn(null, from, null, "TABLE", "_IGNITE_ERR_KEYS");
+
+        sel.addColumn(col, true);
+
+        GridSqlAlias alias = new GridSqlAlias("_IGNITE_ERR_KEYS", new GridSqlParameter(paramIdx));
+
+        alias.resultType(keyCol.resultType());
+
+        from.addChild(alias);
+
+        GridSqlElement e = new GridSqlOperation(GridSqlOperationType.IN, keyCol, new GridSqlSubquery(sel));
+
+        if (where == null)
+            return e;
+        else
+            return new GridSqlOperation(GridSqlOperationType.AND, where, e);
+    }
+
+    /**
+     * @param qry Select.
+     * @param params Parameters.
+     * @param target Extracted parameters.
+     * @param paramIdxs Parameter indexes.
+     * @return Extracted parameters list.
+     */
+    @SuppressWarnings("unused")
+    private static List<Object> findParams(GridSqlQuery qry, Object[] params, ArrayList<Object> target,
+        IntArray paramIdxs) {
+        if (qry instanceof GridSqlSelect)
+            return findParams((GridSqlSelect)qry, params, target, paramIdxs);
+
+        GridSqlUnion union = (GridSqlUnion)qry;
+
+        findParams(union.left(), params, target, paramIdxs);
+        findParams(union.right(), params, target, paramIdxs);
+
+        findParams((GridSqlElement)qry.limit(), params, target, paramIdxs);
+        findParams((GridSqlElement)qry.offset(), params, target, paramIdxs);
+
+        return target;
+    }
+
+    /**
+     * @param qry Select.
+     * @param params Parameters.
+     * @param target Extracted parameters.
+     * @param paramIdxs Parameter indexes.
+     * @return Extracted parameters list.
+     */
+    private static List<Object> findParams(GridSqlSelect qry, Object[] params, ArrayList<Object> target,
+        IntArray paramIdxs) {
+        if (params.length == 0)
+            return target;
+
+        for (GridSqlAst el : qry.columns(false))
+            findParams((GridSqlElement)el, params, target, paramIdxs);
+
+        findParams((GridSqlElement)qry.from(), params, target, paramIdxs);
+        findParams((GridSqlElement)qry.where(), params, target, paramIdxs);
+
+        // Don't search in GROUP BY and HAVING since they expected to be in select list.
+
+        findParams((GridSqlElement)qry.limit(), params, target, paramIdxs);
+        findParams((GridSqlElement)qry.offset(), params, target, paramIdxs);
+
+        return target;
+    }
+
+    /**
+     * @param el Element.
+     * @param params Parameters.
+     * @param target Extracted parameters.
+     * @param paramIdxs Parameter indexes.
+     */
+    private static void findParams(@Nullable GridSqlElement el, Object[] params, ArrayList<Object> target,
+        IntArray paramIdxs) {
+        if (el == null)
+            return;
+
+        if (el instanceof GridSqlParameter) {
+            // H2 Supports queries like "select ?5" but first 4 non-existing parameters are need to be set to any value.
+            // Here we will set them to NULL.
+            final int idx = ((GridSqlParameter)el).index();
+
+            while (target.size() < idx)
+                target.add(null);
+
+            if (params.length <= idx)
+                throw new IgniteException("Invalid number of query parameters. " +
+                    "Cannot find " + idx + " parameter.");
+
+            Object param = params[idx];
+
+            if (idx == target.size())
+                target.add(param);
+            else
+                target.set(idx, param);
+
+            paramIdxs.add(idx);
+        }
+        else if (el instanceof GridSqlSubquery)
+            findParams(((GridSqlSubquery)el).subquery(), params, target, paramIdxs);
+        else
+            for (int i = 0; i < el.size(); i++)
+                findParams((GridSqlElement)el.child(i), params, target, paramIdxs);
+    }
+
+    /**
+     * Processes all the tables and subqueries using the given closure.
+     *
+     * @param from FROM element.
+     * @param c Closure each found table and subquery will be passed to. If returns {@code true} the we need to stop.
+     * @return {@code true} If we have found.
+     */
+    @SuppressWarnings("RedundantCast")
+    private static boolean findTablesInFrom(GridSqlElement from, IgnitePredicate<GridSqlElement> c) {
+        if (from == null)
+            return false;
+
+        if (from instanceof GridSqlTable || from instanceof GridSqlSubquery)
+            return c.apply(from);
+
+        if (from instanceof GridSqlJoin) {
+            // Left and right.
+            if (findTablesInFrom((GridSqlElement)from.child(0), c))
+                return true;
+
+            if (findTablesInFrom((GridSqlElement)from.child(1), c))
+                return true;
+
+            // We don't process ON condition because it is not a joining part of from here.
+            return false;
+        }
+        else if (from instanceof GridSqlAlias)
+            return findTablesInFrom((GridSqlElement)from.child(), c);
+        else if (from instanceof GridSqlFunction)
+            return false;
+
+        throw new IllegalStateException(from.getClass().getName() + " : " + from.getSQL());
+    }
+
+    /**
+     * @param from From element.
+     * @param tbls Tables.
+     */
+    public static void collectAllGridTablesInTarget(GridSqlElement from, final Set<GridSqlTable> tbls) {
+        findTablesInFrom(from, new IgnitePredicate<GridSqlElement>() {
+            @Override public boolean apply(GridSqlElement el) {
+                if (el instanceof GridSqlTable)
+                    tbls.add((GridSqlTable)el);
+
+                return false;
+            }
+        });
+    }
+
+    /**
+     * @param target Expression to extract the table from.
+     * @return Back end table for this element.
+     */
+    public static GridSqlTable gridTableForElement(GridSqlElement target) {
+        Set<GridSqlTable> tbls = new HashSet<>();
+
+        collectAllGridTablesInTarget(target, tbls);
+
+        if (tbls.size() != 1)
+            throw new IgniteSQLException("Failed to determine target table", IgniteQueryErrorCode.TABLE_NOT_FOUND);
+
+        return tbls.iterator().next();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
new file mode 100644
index 0000000..a4a60c3
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlBatchSender.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.util.typedef.F;
+
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.EntryProcessorResult;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode.createJdbcSqlException;
+
+/**
+ * Batch sender class.
+ */
+public class DmlBatchSender {
+    /** Cache context. */
+    private final GridCacheContext cctx;
+
+    /** Batch size. */
+    private final int size;
+
+    /** Batches. */
+    private final Map<UUID, Map<Object, EntryProcessor<Object, Object, Boolean>>> batches = new HashMap<>();
+
+    /** Result count. */
+    private long updateCnt;
+
+    /** Failed keys. */
+    private List<Object> failedKeys;
+
+    /** Exception. */
+    private SQLException err;
+
+    /**
+     * Constructor.
+     *
+     * @param cctx Cache context.
+     * @param size Batch.
+     */
+    public DmlBatchSender(GridCacheContext cctx, int size) {
+        this.cctx = cctx;
+        this.size = size;
+    }
+
+    /**
+     * Add entry to batch.
+     *
+     * @param key Key.
+     * @param proc Processor.
+     */
+    public void add(Object key, EntryProcessor<Object, Object, Boolean> proc) throws IgniteCheckedException {
+        ClusterNode node = cctx.affinity().primaryByKey(key, AffinityTopologyVersion.NONE);
+
+        if (node == null)
+            throw new IgniteCheckedException("Failed to map key to node.");
+
+        UUID nodeId = node.id();
+
+        Map<Object, EntryProcessor<Object, Object, Boolean>> batch = batches.get(nodeId);
+
+        if (batch == null) {
+            batch = new HashMap<>();
+
+            batches.put(nodeId, batch);
+        }
+
+        batch.put(key, proc);
+
+        if (batch.size() >= size) {
+            sendBatch(batch);
+
+            batch.clear();
+        }
+    }
+
+    /**
+     * Flush any remaining entries.
+     *
+     * @throws IgniteCheckedException If failed.
+     */
+    public void flush() throws IgniteCheckedException {
+        for (Map<Object, EntryProcessor<Object, Object, Boolean>> batch : batches.values()) {
+            if (!batch.isEmpty())
+                sendBatch(batch);
+        }
+    }
+
+    /**
+     * @return Update count.
+     */
+    public long updateCount() {
+        return updateCnt;
+    }
+
+    /**
+     * @return Failed keys.
+     */
+    public List<Object> failedKeys() {
+        return failedKeys != null ? failedKeys : Collections.emptyList();
+    }
+
+    /**
+     * @return Error.
+     */
+    public SQLException error() {
+        return err;
+    }
+
+    /**
+     * Send the batch.
+     *
+     * @param batch Batch.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void sendBatch(Map<Object, EntryProcessor<Object, Object, Boolean>> batch)
+        throws IgniteCheckedException {
+        DmlPageProcessingResult pageRes = processPage(cctx, batch);
+
+        updateCnt += pageRes.count();
+
+        if (failedKeys == null)
+            failedKeys = new ArrayList<>();
+
+        failedKeys.addAll(F.asList(pageRes.errorKeys()));
+
+        if (pageRes.error() != null) {
+            if (err == null)
+                err = error();
+            else
+                err.setNextException(error());
+        }
+    }
+
+    /**
+     * Execute given entry processors and collect errors, if any.
+     * @param cctx Cache context.
+     * @param rows Rows to process.
+     * @return Triple [number of rows actually changed; keys that failed to update (duplicates or concurrently
+     *     updated ones); chain of exceptions for all keys whose processing resulted in error, or null for no errors].
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings({"unchecked", "ConstantConditions"})
+    private static DmlPageProcessingResult processPage(GridCacheContext cctx,
+        Map<Object, EntryProcessor<Object, Object, Boolean>> rows) throws IgniteCheckedException {
+        Map<Object, EntryProcessorResult<Boolean>> res = cctx.cache().invokeAll(rows);
+
+        if (F.isEmpty(res))
+            return new DmlPageProcessingResult(rows.size(), null, null);
+
+        DmlPageProcessingErrorResult splitRes = splitErrors(res);
+
+        int keysCnt = splitRes.errorKeys().length;
+
+        return new DmlPageProcessingResult(rows.size() - keysCnt - splitRes.errorCount(), splitRes.errorKeys(),
+            splitRes.error());
+    }
+
+    /**
+     * Process errors of entry processor - split the keys into duplicated/concurrently modified and those whose
+     * processing yielded an exception.
+     *
+     * @param res Result of {@link GridCacheAdapter#invokeAll)}
+     * @return pair [array of duplicated/concurrently modified keys, SQL exception for erroneous keys] (exception is
+     * null if all keys are duplicates/concurrently modified ones).
+     */
+    private static DmlPageProcessingErrorResult splitErrors(Map<Object, EntryProcessorResult<Boolean>> res) {
+        Set<Object> errKeys = new LinkedHashSet<>(res.keySet());
+
+        SQLException currSqlEx = null;
+
+        SQLException firstSqlEx = null;
+
+        int errors = 0;
+
+        // Let's form a chain of SQL exceptions
+        for (Map.Entry<Object, EntryProcessorResult<Boolean>> e : res.entrySet()) {
+            try {
+                e.getValue().get();
+            }
+            catch (EntryProcessorException ex) {
+                SQLException next = createJdbcSqlException("Failed to process key '" + e.getKey() + '\'',
+                    IgniteQueryErrorCode.ENTRY_PROCESSING);
+
+                next.initCause(ex);
+
+                if (currSqlEx != null)
+                    currSqlEx.setNextException(next);
+                else
+                    firstSqlEx = next;
+
+                currSqlEx = next;
+
+                errKeys.remove(e.getKey());
+
+                errors++;
+            }
+        }
+
+        return new DmlPageProcessingErrorResult(errKeys.toArray(), firstSqlEx, errors);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedPlanInfo.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedPlanInfo.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedPlanInfo.java
new file mode 100644
index 0000000..44c6e22
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlDistributedPlanInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import java.util.List;
+
+/**
+ * Additional information about distributed update plan.
+ */
+public final class DmlDistributedPlanInfo {
+    /** Whether update involves only replicated caches. */
+    private final boolean replicatedOnly;
+
+    /** Identifiers of caches involved in update (used for cluster nodes mapping). */
+    private final List<Integer> cacheIds;
+
+    /**
+     * Constructor.
+     *
+     * @param replicatedOnly Whether all caches are replicated.
+     * @param cacheIds List of cache identifiers.
+     */
+    public DmlDistributedPlanInfo(boolean replicatedOnly, List<Integer> cacheIds) {
+        this.replicatedOnly = replicatedOnly;
+        this.cacheIds = cacheIds;
+    }
+
+    /**
+     * @return {@code true} in case all involved caches are replicated.
+     */
+    public boolean isReplicatedOnly() {
+        return replicatedOnly;
+    }
+
+    /**
+     * @return cache identifiers.
+     */
+    public List<Integer> getCacheIds() {
+        return cacheIds;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingErrorResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingErrorResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingErrorResult.java
new file mode 100644
index 0000000..02e7359
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingErrorResult.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import javax.cache.processor.EntryProcessor;
+import java.sql.SQLException;
+
+/**
+ * Result of splitting keys whose processing resulted into an exception from those skipped by
+ * logic of {@link EntryProcessor}s (most likely INSERT duplicates, or UPDATE/DELETE keys whose values
+ * had been modified concurrently), counting and collecting entry processor exceptions.
+ */
+public final class DmlPageProcessingErrorResult {
+    /** Keys that failed to be processed by {@link EntryProcessor} (not due to an exception). */
+    private final Object[] errKeys;
+
+    /** Number of entries whose processing resulted into an exception. */
+    private final int cnt;
+
+    /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */
+    private final SQLException ex;
+
+    /** */
+    @SuppressWarnings("ConstantConditions")
+    public DmlPageProcessingErrorResult(@NotNull Object[] errKeys, SQLException ex, int exCnt) {
+        errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+        // When exceptions count must be zero, exceptions chain must be not null, and vice versa.
+        assert exCnt == 0 ^ ex != null;
+
+        this.errKeys = errKeys;
+        this.cnt = exCnt;
+        this.ex = ex;
+    }
+
+    /**
+     * @return Number of entries whose processing resulted into an exception.
+     */
+    public int errorCount() {
+        return cnt;
+    }
+
+    /**
+     * @return Error keys.
+     */
+    public Object[] errorKeys() {
+        return errKeys;
+    }
+
+    /**
+     * @return Error.
+     */
+    @Nullable
+    public SQLException error() {
+        return ex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingResult.java
new file mode 100644
index 0000000..f2db3a7
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlPageProcessingResult.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import java.sql.SQLException;
+
+/**
+ * Result of processing an individual page with {@link IgniteCache#invokeAll} including error details, if any.
+ */
+public final class DmlPageProcessingResult {
+    /** Number of successfully processed items. */
+    private final long cnt;
+
+    /** Keys that failed to be updated or deleted due to concurrent modification of values. */
+    private final Object[] errKeys;
+
+    /** Chain of exceptions corresponding to failed keys. Null if no keys yielded an exception. */
+    private final SQLException ex;
+
+    /** */
+    @SuppressWarnings("ConstantConditions")
+    public DmlPageProcessingResult(long cnt, Object[] errKeys, @Nullable SQLException ex) {
+        this.cnt = cnt;
+        this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+        this.ex = ex;
+    }
+
+    /**
+     * @return Number of successfully processed items.
+     */
+    public long count() {
+        return cnt;
+    }
+
+    /**
+     * @return Error keys.
+     */
+    public Object[] errorKeys() {
+        return errKeys;
+    }
+
+    /**
+     * @return Error.
+     */
+    @Nullable public SQLException error() {
+        return ex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/03bb5513/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
new file mode 100644
index 0000000..6621fc2
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/DmlUtils.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.dml;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.query.IgniteSQLException;
+import org.apache.ignite.internal.processors.query.h2.H2Utils;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.h2.util.DateTimeUtils;
+import org.h2.util.LocalDateTimeUtils;
+import org.h2.value.Value;
+import org.h2.value.ValueDate;
+import org.h2.value.ValueTime;
+import org.h2.value.ValueTimestamp;
+
+import java.lang.reflect.Array;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Date;
+
+/**
+ * DML utility methods.
+ */
+public class DmlUtils {
+    /**
+     * Convert value to column's expected type by means of H2.
+     *
+     * @param val Source value.
+     * @param desc Row descriptor.
+     * @param expCls Expected value class.
+     * @param type Expected column type to convert to.
+     * @return Converted object.
+     * @throws IgniteCheckedException if failed.
+     */
+    @SuppressWarnings({"ConstantConditions", "SuspiciousSystemArraycopy"})
+    public static Object convert(Object val, GridH2RowDescriptor desc, Class<?> expCls, int type)
+        throws IgniteCheckedException {
+        if (val == null)
+            return null;
+
+        Class<?> currCls = val.getClass();
+
+        try {
+            if (val instanceof Date && currCls != Date.class && expCls == Date.class) {
+                // H2 thinks that java.util.Date is always a Timestamp, while binary marshaller expects
+                // precise Date instance. Let's satisfy it.
+                return new Date(((Date) val).getTime());
+            }
+
+            // User-given UUID is always serialized by H2 to byte array, so we have to deserialize manually
+            if (type == Value.UUID && currCls == byte[].class)
+                return U.unmarshal(desc.context().marshaller(), (byte[]) val,
+                    U.resolveClassLoader(desc.context().gridConfig()));
+
+            if (LocalDateTimeUtils.isJava8DateApiPresent()) {
+                if (val instanceof Timestamp && LocalDateTimeUtils.isLocalDateTime(expCls))
+                    return LocalDateTimeUtils.valueToLocalDateTime(ValueTimestamp.get((Timestamp) val));
+
+                if (val instanceof Date && LocalDateTimeUtils.isLocalDate(expCls))
+                    return LocalDateTimeUtils.valueToLocalDate(ValueDate.fromDateValue(
+                        DateTimeUtils.dateValueFromDate(((Date) val).getTime())));
+
+                if (val instanceof Time && LocalDateTimeUtils.isLocalTime(expCls))
+                    return LocalDateTimeUtils.valueToLocalTime(ValueTime.get((Time) val));
+            }
+
+            // We have to convert arrays of reference types manually -
+            // see https://issues.apache.org/jira/browse/IGNITE-4327
+            // Still, we only can convert from Object[] to something more precise.
+            if (type == Value.ARRAY && currCls != expCls) {
+                if (currCls != Object[].class)
+                    throw new IgniteCheckedException("Unexpected array type - only conversion from Object[] " +
+                        "is assumed");
+
+                // Why would otherwise type be Value.ARRAY?
+                assert expCls.isArray();
+
+                Object[] curr = (Object[]) val;
+
+                Object newArr = Array.newInstance(expCls.getComponentType(), curr.length);
+
+                System.arraycopy(curr, 0, newArr, 0, curr.length);
+
+                return newArr;
+            }
+
+            return H2Utils.convert(val, desc, type);
+        }
+        catch (Exception e) {
+            throw new IgniteSQLException("Value conversion failed [from=" + currCls.getName() + ", to=" +
+                expCls.getName() +']', IgniteQueryErrorCode.CONVERSION_FAILED, e);
+        }
+    }
+
+    /**
+     * Private constructor.
+     */
+    private DmlUtils() {
+        // No-op.
+    }
+}


Mime
View raw message