ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [3/4] ignite git commit: IGNITE-6024: SQL: Implemented "skipReducerOnUpdate" flag. This closes #2488.
Date Fri, 13 Oct 2017 09:30:07 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
index 98117b2..9e55442 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DmlStatementsProcessor.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2;
 
 import java.lang.reflect.Array;
+import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.sql.Time;
@@ -150,7 +151,8 @@ public class DmlStatementsProcessor {
      * Execute DML statement, possibly with few re-attempts in case of concurrent data modifications.
      *
      * @param schemaName Schema.
-     * @param prepared Prepared JDBC statement.
+     * @param conn Connection.
+     * @param prepared Prepared statement.
      * @param fieldsQry Original query.
      * @param loc Query locality flag.
      * @param filters Cache name and key filter.
@@ -158,13 +160,14 @@ public class DmlStatementsProcessor {
      * @return Update result (modified items count and failed keys).
      * @throws IgniteCheckedException if failed.
      */
-    private UpdateResult updateSqlFields(String schemaName, Prepared prepared, SqlFieldsQuery fieldsQry,
-        boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel) throws IgniteCheckedException {
+    private UpdateResult updateSqlFields(String schemaName, Connection conn, Prepared prepared,
+        SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters, GridQueryCancel cancel)
+        throws IgniteCheckedException {
         Object[] errKeys = null;
 
         long items = 0;
 
-        UpdatePlan plan = getPlanForStatement(schemaName, prepared, null);
+        UpdatePlan plan = getPlanForStatement(schemaName, conn, prepared, fieldsQry, loc, null);
 
         GridCacheContext<?, ?> cctx = plan.tbl.rowDescriptor().context();
 
@@ -188,14 +191,14 @@ public class DmlStatementsProcessor {
             UpdateResult r;
 
             try {
-                r = executeUpdateStatement(schemaName, cctx, prepared, fieldsQry, loc, filters, cancel, errKeys);
+                r = executeUpdateStatement(schemaName, cctx, conn, prepared, fieldsQry, loc, filters, cancel, errKeys);
             }
             finally {
                 cctx.operationContextPerCall(opCtx);
             }
 
-            items += r.cnt;
-            errKeys = r.errKeys;
+            items += r.counter();
+            errKeys = r.errorKeys();
 
             if (F.isEmpty(errKeys))
                 break;
@@ -213,19 +216,22 @@ public class DmlStatementsProcessor {
 
     /**
      * @param schemaName Schema.
-     * @param p Prepared.
+     * @param c Connection.
+     * @param p Prepared statement.
      * @param fieldsQry Initial query
      * @param cancel Query cancel.
      * @return Update result wrapped into {@link GridQueryFieldsResult}
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("unchecked")
-    QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, Prepared p,
+    QueryCursorImpl<List<?>> updateSqlFieldsDistributed(String schemaName, Connection c, Prepared p,
         SqlFieldsQuery fieldsQry, GridQueryCancel cancel) throws IgniteCheckedException {
-        UpdateResult res = updateSqlFields(schemaName, p, fieldsQry, false, null, cancel);
+        UpdateResult res = updateSqlFields(schemaName, c, p, fieldsQry, false, null, cancel);
+
+        checkUpdateResult(res);
 
         QueryCursorImpl<List<?>> resCur = (QueryCursorImpl<List<?>>)new QueryCursorImpl(Collections.singletonList
-            (Collections.singletonList(res.cnt)), cancel, false);
+            (Collections.singletonList(res.counter())), cancel, false);
 
         resCur.fieldsMeta(UPDATE_RESULT_META);
 
@@ -236,6 +242,7 @@ public class DmlStatementsProcessor {
      * Execute DML statement on local cache.
      *
      * @param schemaName Schema.
+     * @param conn Connection.
      * @param stmt Prepared statement.
      * @param fieldsQry Fields query.
      * @param filters Cache name and key filter.
@@ -244,14 +251,14 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("unchecked")
-    GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, PreparedStatement stmt,
+    GridQueryFieldsResult updateSqlFieldsLocal(String schemaName, Connection conn, PreparedStatement stmt,
         SqlFieldsQuery fieldsQry, IndexingQueryFilter filters, GridQueryCancel cancel)
         throws IgniteCheckedException {
-        UpdateResult res = updateSqlFields(schemaName,  GridSqlQueryParser.prepared(stmt), fieldsQry, true,
+        UpdateResult res = updateSqlFields(schemaName, conn, GridSqlQueryParser.prepared(stmt), fieldsQry, true,
             filters, cancel);
 
         return new GridQueryFieldsResultAdapter(UPDATE_RESULT_META,
-            new IgniteSingletonIterator(Collections.singletonList(res.cnt)));
+            new IgniteSingletonIterator(Collections.singletonList(res.counter())));
     }
 
     /**
@@ -272,7 +279,7 @@ public class DmlStatementsProcessor {
 
         assert p != null;
 
-        UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, null);
+        UpdatePlan plan = UpdatePlanBuilder.planForStatement(p, true, idx, null, null, null);
 
         if (!F.eq(streamer.cacheName(), plan.tbl.rowDescriptor().context().name()))
             throw new IgniteSQLException("Cross cache streaming is not supported, please specify cache explicitly" +
@@ -340,6 +347,7 @@ public class DmlStatementsProcessor {
      *
      * @param schemaName Schema name.
      * @param cctx Cache context.
+     * @param c Connection.
      * @param prepared Prepared statement for DML query.
      * @param fieldsQry Fields query.
      * @param loc Local query flag.
@@ -350,14 +358,14 @@ public class DmlStatementsProcessor {
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings({"ConstantConditions", "unchecked"})
-    private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx,
+    private UpdateResult executeUpdateStatement(String schemaName, final GridCacheContext cctx, Connection c,
         Prepared prepared, SqlFieldsQuery fieldsQry, boolean loc, IndexingQueryFilter filters,
         GridQueryCancel cancel, Object[] failedKeys) throws IgniteCheckedException {
         int mainCacheId = CU.cacheId(cctx.name());
 
         Integer errKeysPos = null;
 
-        UpdatePlan plan = getPlanForStatement(schemaName, prepared, errKeysPos);
+        UpdatePlan plan = getPlanForStatement(schemaName, c, prepared, fieldsQry, loc, errKeysPos);
 
         if (plan.fastUpdateArgs != null) {
             assert F.isEmpty(failedKeys) && errKeysPos == null;
@@ -365,6 +373,14 @@ public class DmlStatementsProcessor {
             return doFastUpdate(plan, fieldsQry.getArgs());
         }
 
+        if (plan.distributed != null) {
+            UpdateResult result = doDistributedUpdate(schemaName, fieldsQry, plan, cancel);
+
+            // null is returned in case not all nodes support distributed DML.
+            if (result != null)
+                return result;
+        }
+
         assert !F.isEmpty(plan.selectQry);
 
         QueryCursorImpl<List<?>> cur;
@@ -401,18 +417,31 @@ public class DmlStatementsProcessor {
 
         int pageSize = loc ? 0 : fieldsQry.getPageSize();
 
+        return processDmlSelectResult(cctx, plan, cur, pageSize);
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @param plan Update plan.
+     * @param cursor Cursor over select results.
+     * @param pageSize Page size.
+     * @return Pair [number of successfully processed items; keys that have failed to be processed]
+     * @throws IgniteCheckedException if failed.
+     */
+    private UpdateResult processDmlSelectResult(GridCacheContext cctx, UpdatePlan plan, Iterable<List<?>> cursor,
+        int pageSize) throws IgniteCheckedException {
         switch (plan.mode) {
             case MERGE:
-                return new UpdateResult(doMerge(plan, cur, pageSize), X.EMPTY_OBJECT_ARRAY);
+                return new UpdateResult(doMerge(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
 
             case INSERT:
-                return new UpdateResult(doInsert(plan, cur, pageSize), X.EMPTY_OBJECT_ARRAY);
+                return new UpdateResult(doInsert(plan, cursor, pageSize), X.EMPTY_OBJECT_ARRAY);
 
             case UPDATE:
-                return doUpdate(plan, cur, pageSize);
+                return doUpdate(plan, cursor, pageSize);
 
             case DELETE:
-                return doDelete(cctx, cur, pageSize);
+                return doDelete(cctx, cursor, pageSize);
 
             default:
                 throw new IgniteSQLException("Unexpected DML operation [mode=" + plan.mode + ']',
@@ -425,20 +454,23 @@ public class DmlStatementsProcessor {
      * if available.
      *
      * @param schema Schema.
-     * @param p Prepared JDBC statement.
+     * @param conn Connection.
+     * @param p Prepared statement.
+     * @param fieldsQry Original fields query.
+     * @param loc Local query flag.
      * @return Update plan.
      */
     @SuppressWarnings({"unchecked", "ConstantConditions"})
-    private UpdatePlan getPlanForStatement(String schema, Prepared p, @Nullable Integer errKeysPos)
-        throws IgniteCheckedException {
-        H2DmlPlanKey planKey = new H2DmlPlanKey(schema, p.getSQL());
+    private UpdatePlan getPlanForStatement(String schema, Connection conn, Prepared p, SqlFieldsQuery fieldsQry,
+        boolean loc, @Nullable Integer errKeysPos) throws IgniteCheckedException {
+        H2DmlPlanKey planKey = new H2DmlPlanKey(schema, p.getSQL(), loc, fieldsQry);
 
         UpdatePlan res = (errKeysPos == null ? planCache.get(planKey) : null);
 
         if (res != null)
             return res;
 
-        res = UpdatePlanBuilder.planForStatement(p, errKeysPos);
+        res = UpdatePlanBuilder.planForStatement(p, loc, idx, conn, fieldsQry, errKeysPos);
 
         // Don't cache re-runs
         if (errKeysPos == null)
@@ -449,6 +481,7 @@ public class DmlStatementsProcessor {
 
     /**
      * Perform single cache operation based on given args.
+     * @param plan Update plan.
      * @param args Query parameters.
      * @return 1 if an item was affected, 0 otherwise.
      * @throws IgniteCheckedException if failed.
@@ -487,6 +520,25 @@ public class DmlStatementsProcessor {
     }
 
     /**
+     * @param schemaName Schema name.
+     * @param fieldsQry Initial query.
+     * @param plan Update plan.
+     * @param cancel Cancel state.
+     * @return Update result.
+     * @throws IgniteCheckedException if failed.
+     */
+    private UpdateResult doDistributedUpdate(String schemaName, SqlFieldsQuery fieldsQry, UpdatePlan plan,
+        GridQueryCancel cancel) throws IgniteCheckedException {
+        assert plan.distributed != null;
+
+        if (cancel == null)
+            cancel = new GridQueryCancel();
+
+        return idx.runDistributedUpdate(schemaName, fieldsQry, plan.distributed.getCacheIds(),
+            plan.distributed.isReplicatedOnly(), cancel);
+    }
+
+    /**
      * Perform DELETE operation on top of results of SELECT.
      * @param cctx Cache context.
      * @param cursor SELECT results.
@@ -573,7 +625,7 @@ public class DmlStatementsProcessor {
 
                 GridQueryProperty prop = plan.tbl.rowDescriptor().type().property(plan.colNames[i]);
 
-                assert prop != null;
+                assert prop != null : "Unknown property: " + plan.colNames[i];
 
                 newColVals.put(plan.colNames[i], convert(row.get(i + 2), desc, prop.type(), plan.colTypes[i]));
             }
@@ -981,6 +1033,31 @@ public class DmlStatementsProcessor {
         return new IgniteBiTuple<>(key, val);
     }
 
+    /**
+     *
+     * @param schemaName Schema name.
+     * @param stmt Prepared statement.
+     * @param fldsQry Query.
+     * @param filter Filter.
+     * @param cancel Cancel state.
+     * @param local Locality flag.
+     * @return Update result.
+     * @throws IgniteCheckedException if failed.
+     */
+    UpdateResult mapDistributedUpdate(String schemaName, PreparedStatement stmt, SqlFieldsQuery fldsQry,
+        IndexingQueryFilter filter, GridQueryCancel cancel, boolean local) throws IgniteCheckedException {
+        Connection c;
+
+        try {
+            c = stmt.getConnection();
+        }
+        catch (SQLException e) {
+            throw new IgniteCheckedException(e);
+        }
+
+        return updateSqlFields(schemaName, c, GridSqlQueryParser.prepared(stmt), fldsQry, local, filter, cancel);
+    }
+
     /** */
     private final static class InsertEntryProcessor implements EntryProcessor<Object, Object, Boolean> {
         /** Value to set. */
@@ -1079,26 +1156,19 @@ public class DmlStatementsProcessor {
         return stmt instanceof Merge || stmt instanceof Insert || stmt instanceof Update || stmt instanceof Delete;
     }
 
-    /** Update result - modifications count and keys to re-run query with, if needed. */
-    private final static class UpdateResult {
-        /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */
-        final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY);
-
-        /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */
-        final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
-
-        /** Number of processed items. */
-        final long cnt;
+    /**
+     * Check update result for erroneous keys and throws concurrent update exception if necessary.
+     *
+     * @param r Update result.
+     */
+    static void checkUpdateResult(UpdateResult r) {
+        if (!F.isEmpty(r.errorKeys())) {
+            String msg = "Failed to update some keys because they had been modified concurrently " +
+                "[keys=" + r.errorKeys() + ']';
 
-        /** Keys that failed to be updated or deleted due to concurrent modification of values. */
-        @NotNull
-        final Object[] errKeys;
+            SQLException conEx = createJdbcSqlException(msg, IgniteQueryErrorCode.CONCURRENT_UPDATE);
 
-        /** */
-        @SuppressWarnings("ConstantConditions")
-        private UpdateResult(long cnt, Object[] errKeys) {
-            this.cnt = cnt;
-            this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+            throw new IgniteSQLException(conEx);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java
index 3a43ea1..455b5e5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/H2DmlPlanKey.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.processors.query.h2;
 
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.internal.processors.query.h2.dml.UpdatePlanBuilder;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 
@@ -30,20 +32,33 @@ public class H2DmlPlanKey {
     /** SQL. */
     private final String sql;
 
+    /** Flags. */
+    private final byte flags;
+
     /**
      * Constructor.
      *
      * @param schemaName Schema name.
      * @param sql SQL.
      */
-    public H2DmlPlanKey(String schemaName, String sql) {
+    public H2DmlPlanKey(String schemaName, String sql, boolean loc, SqlFieldsQuery fieldsQry) {
         this.schemaName = schemaName;
         this.sql = sql;
+
+        if (loc || !UpdatePlanBuilder.isSkipReducerOnUpdateQuery(fieldsQry))
+            this.flags = 0; // flags only relevant for server side updates.
+        else {
+            this.flags = (byte)(1 +
+                (fieldsQry.isDistributedJoins() ? 2 : 0) +
+                (fieldsQry.isEnforceJoinOrder() ? 4 : 0) +
+                (fieldsQry.isCollocated() ? 8 : 0));
+        }
     }
 
     /** {@inheritDoc} */
     @Override public int hashCode() {
-        return 31 * (schemaName != null ? schemaName.hashCode() : 0) + (sql != null ? sql.hashCode() : 0);
+        return 31 * (31 * (schemaName != null ? schemaName.hashCode() : 0) + (sql != null ? sql.hashCode() : 0)) +
+            flags;
     }
 
     /** {@inheritDoc} */
@@ -56,7 +71,7 @@ public class H2DmlPlanKey {
 
         H2DmlPlanKey other = (H2DmlPlanKey)o;
 
-        return F.eq(sql, other.sql) && F.eq(schemaName, other.schemaName);
+        return F.eq(sql, other.sql) && F.eq(schemaName, other.schemaName) && flags == other.flags;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 22ed592..fddd2e8 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -59,7 +59,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.jdbc2.JdbcSqlFieldsQuery;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
@@ -834,7 +834,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             fldsQry.setEnforceJoinOrder(enforceJoinOrder);
             fldsQry.setTimeout(timeout, TimeUnit.MILLISECONDS);
 
-            return dmlProc.updateSqlFieldsLocal(schemaName, stmt, fldsQry, filter, cancel);
+            return dmlProc.updateSqlFieldsLocal(schemaName, conn, stmt, fldsQry, filter, cancel);
         }
         else if (DdlStatementsProcessor.isDdlStatement(p))
             throw new IgniteSQLException("DDL statements are supported for the whole cluster only",
@@ -1215,6 +1215,27 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         };
     }
 
+    /**
+     * Run DML on remote nodes.
+     *
+     * @param schemaName Schema name.
+     * @param fieldsQry Initial update query.
+     * @param cacheIds Cache identifiers.
+     * @param isReplicatedOnly Whether query uses only replicated caches.
+     * @param cancel Cancel state.
+     * @return Update result.
+     */
+    UpdateResult runDistributedUpdate(
+        String schemaName,
+        SqlFieldsQuery fieldsQry,
+        List<Integer> cacheIds,
+        boolean isReplicatedOnly,
+        GridQueryCancel cancel) {
+        return rdcQryExec.update(schemaName, cacheIds, fieldsQry.getSql(), fieldsQry.getArgs(),
+            fieldsQry.isEnforceJoinOrder(), fieldsQry.getPageSize(), fieldsQry.getTimeout(),
+            fieldsQry.getPartitions(), isReplicatedOnly, cancel);
+    }
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public <K, V> QueryCursor<Cache.Entry<K, V>> queryDistributedSql(String schemaName, String cacheName,
@@ -1429,8 +1450,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 if (twoStepQry == null) {
                     if (DmlStatementsProcessor.isDmlStatement(prepared)) {
                         try {
-                            res.add(dmlProc.updateSqlFieldsDistributed(schemaName, prepared,
-                                new SqlFieldsQuery(qry).setSql(sqlQry).setArgs(args), cancel));
+                            res.add(dmlProc.updateSqlFieldsDistributed(schemaName, c, prepared,
+                                qry.copy().setSql(sqlQry).setArgs(args), cancel));
 
                             continue;
                         }
@@ -1452,33 +1473,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                     }
                 }
 
-                LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
-
                 assert twoStepQry != null;
 
-                int tblCnt = twoStepQry.tablesCount();
-
-                if (mainCacheId != null)
-                    caches0.add(mainCacheId);
-
-                if (tblCnt > 0) {
-                    for (QueryTable tblKey : twoStepQry.tables()) {
-                        GridH2Table tbl = dataTable(tblKey);
-
-                        int cacheId = CU.cacheId(tbl.cacheName());
-
-                        caches0.add(cacheId);
-                    }
-                }
+                List<Integer> cacheIds = collectCacheIds(mainCacheId, twoStepQry);
 
-                if (caches0.isEmpty())
+                if (F.isEmpty(cacheIds))
                     twoStepQry.local(true);
                 else {
-                    //Prohibit usage indices with different numbers of segments in same query.
-                    List<Integer> cacheIds = new ArrayList<>(caches0);
-
-                    checkCacheIndexSegmentation(cacheIds);
-
                     twoStepQry.cacheIds(cacheIds);
                     twoStepQry.local(qry.isLocal());
                 }
@@ -1517,7 +1518,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param isQry {@code true} for select queries, otherwise (DML/DDL queries) {@code false}.
      */
     private void checkQueryType(SqlFieldsQuery qry, boolean isQry) {
-        if (qry instanceof JdbcSqlFieldsQuery && ((JdbcSqlFieldsQuery)qry).isQuery() != isQry)
+        if (qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isQuery() != null &&
+            ((SqlFieldsQueryEx)qry).isQuery() != isQry)
             throw new IgniteSQLException("Given statement type does not match that declared by JDBC driver",
                 IgniteQueryErrorCode.STMT_TYPE_MISMATCH);
     }
@@ -1568,6 +1570,29 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * Run DML request from other node.
+     *
+     * @param schemaName Schema name.
+     * @param fldsQry Query.
+     * @param filter Filter.
+     * @param cancel Cancel state.
+     * @param local Locality flag.
+     * @return Update result.
+     * @throws IgniteCheckedException if failed.
+     */
+    public UpdateResult mapDistributedUpdate(String schemaName, SqlFieldsQuery fldsQry, IndexingQueryFilter filter,
+        GridQueryCancel cancel, boolean local) throws IgniteCheckedException {
+        Connection conn = connectionForSchema(schemaName);
+
+        H2Utils.setupConnection(conn, false, fldsQry.isEnforceJoinOrder());
+
+        PreparedStatement stmt = preparedStatementWithParams(conn, fldsQry.getSql(),
+            Arrays.asList(fldsQry.getArgs()), true);
+
+        return dmlProc.mapDistributedUpdate(schemaName, stmt, fldsQry, filter, cancel, local);
+    }
+
+    /**
      * @throws IllegalStateException if segmented indices used with non-segmented indices.
      */
     private void checkCacheIndexSegmentation(List<Integer> cacheIds) {
@@ -2524,6 +2549,43 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * Collect cache identifiers from two-step query.
+     *
+     * @param mainCacheId Id of main cache.
+     * @param twoStepQry Two-step query.
+     * @return Result.
+     */
+    public List<Integer> collectCacheIds(@Nullable Integer mainCacheId, GridCacheTwoStepQuery twoStepQry) {
+        LinkedHashSet<Integer> caches0 = new LinkedHashSet<>();
+
+        int tblCnt = twoStepQry.tablesCount();
+
+        if (mainCacheId != null)
+            caches0.add(mainCacheId);
+
+        if (tblCnt > 0) {
+            for (QueryTable tblKey : twoStepQry.tables()) {
+                GridH2Table tbl = dataTable(tblKey);
+
+                int cacheId = CU.cacheId(tbl.cacheName());
+
+                caches0.add(cacheId);
+            }
+        }
+
+        if (caches0.isEmpty())
+            return null;
+        else {
+            //Prohibit usage indices with different numbers of segments in same query.
+            List<Integer> cacheIds = new ArrayList<>(caches0);
+
+            checkCacheIndexSegmentation(cacheIds);
+
+            return cacheIds;
+        }
+    }
+
+    /**
      * Closeable iterator.
      */
     private interface ClIter<X> extends AutoCloseable, Iterator<X> {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
new file mode 100644
index 0000000..de0e63f
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/UpdateResult.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2;
+
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * Update result - modifications count and keys to re-run query with, if needed.
+ */
+public final class UpdateResult {
+    /** Result to return for operations that affected 1 item - mostly to be used for fast updates and deletes. */
+    final static UpdateResult ONE = new UpdateResult(1, X.EMPTY_OBJECT_ARRAY);
+
+    /** Result to return for operations that affected 0 items - mostly to be used for fast updates and deletes. */
+    final static UpdateResult ZERO = new UpdateResult(0, X.EMPTY_OBJECT_ARRAY);
+
+    /** Number of processed items. */
+    private final long cnt;
+
+    /** Keys that failed to be updated or deleted due to concurrent modification of values. */
+    private final Object[] errKeys;
+
+    /**
+     * Constructor.
+     *
+     * @param cnt Updated rows count.
+     * @param errKeys Array of erroneous keys.
+     */
+    public @SuppressWarnings("ConstantConditions") UpdateResult(long cnt, Object[] errKeys) {
+        this.cnt = cnt;
+        this.errKeys = U.firstNotNull(errKeys, X.EMPTY_OBJECT_ARRAY);
+    }
+
+    /**
+     * @return Update counter.
+     */
+    public long counter() {
+       return cnt;
+    }
+
+    /**
+     * @return Error keys.
+     */
+    public Object[] errorKeys() {
+        return errKeys;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
index b81ac60..a99d811 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlan.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2.dml;
 
+import java.util.List;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.util.typedef.F;
 
@@ -64,10 +65,13 @@ public final class UpdatePlan {
     /** Arguments for fast UPDATE or DELETE. */
     public final FastUpdateArguments fastUpdateArgs;
 
+    /** Additional info for distributed update. */
+    public final DistributedPlanInfo distributed;
+
     /** */
     private UpdatePlan(UpdateMode mode, GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier,
         KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry,
-        int rowsNum, FastUpdateArguments fastUpdateArgs) {
+        int rowsNum, FastUpdateArguments fastUpdateArgs, DistributedPlanInfo distributed) {
         this.colNames = colNames;
         this.colTypes = colTypes;
         this.rowsNum = rowsNum;
@@ -83,46 +87,84 @@ public final class UpdatePlan {
         this.selectQry = selectQry;
         this.isLocSubqry = isLocSubqry;
         this.fastUpdateArgs = fastUpdateArgs;
+        this.distributed = distributed;
     }
 
     /** */
     public static UpdatePlan forMerge(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier,
         KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry,
-        int rowsNum) {
+        int rowsNum, DistributedPlanInfo distributed) {
         assert !F.isEmpty(colNames);
 
         return new UpdatePlan(UpdateMode.MERGE, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, valColIdx,
-            selectQry, isLocSubqry, rowsNum, null);
+            selectQry, isLocSubqry, rowsNum, null, distributed);
     }
 
     /** */
     public static UpdatePlan forInsert(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier keySupplier,
-        KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry, int rowsNum) {
+        KeyValueSupplier valSupplier, int keyColIdx, int valColIdx, String selectQry, boolean isLocSubqry,
+        int rowsNum, DistributedPlanInfo distributed) {
         assert !F.isEmpty(colNames);
 
-        return new UpdatePlan(UpdateMode.INSERT, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx, valColIdx,
-            selectQry, isLocSubqry, rowsNum, null);
+        return new UpdatePlan(UpdateMode.INSERT, tbl, colNames, colTypes, keySupplier, valSupplier, keyColIdx,
+            valColIdx, selectQry, isLocSubqry, rowsNum, null, distributed);
     }
 
     /** */
     public static UpdatePlan forUpdate(GridH2Table tbl, String[] colNames, int[] colTypes, KeyValueSupplier valSupplier,
-        int valColIdx, String selectQry) {
+        int valColIdx, String selectQry, DistributedPlanInfo distributed) {
         assert !F.isEmpty(colNames);
 
         return new UpdatePlan(UpdateMode.UPDATE, tbl, colNames, colTypes, null, valSupplier, -1, valColIdx, selectQry,
-            false, 0, null);
+            false, 0, null, distributed);
     }
 
     /** */
-    public static UpdatePlan forDelete(GridH2Table tbl, String selectQry) {
-        return new UpdatePlan(UpdateMode.DELETE, tbl, null, null, null, null, -1, -1, selectQry, false, 0, null);
+    public static UpdatePlan forDelete(GridH2Table tbl, String selectQry, DistributedPlanInfo distributed) {
+        return new UpdatePlan(UpdateMode.DELETE, tbl, null, null, null, null, -1, -1, selectQry, false, 0, null,
+            distributed);
     }
 
     /** */
     public static UpdatePlan forFastUpdate(UpdateMode mode, GridH2Table tbl, FastUpdateArguments fastUpdateArgs) {
         assert mode == UpdateMode.UPDATE || mode == UpdateMode.DELETE;
 
-        return new UpdatePlan(mode, tbl, null, null, null, null, -1, -1, null, false, 0, fastUpdateArgs);
+        return new UpdatePlan(mode, tbl, null, null, null, null, -1, -1, null, false, 0, fastUpdateArgs, null);
     }
 
+    /**
+     * Additional information about distributed update plan.
+     */
+    public final static class DistributedPlanInfo {
+        /** Whether update involves only replicated caches. */
+        private final boolean replicatedOnly;
+
+        /** Identifiers of caches involved in update (used for cluster nodes mapping). */
+        private final List<Integer> cacheIds;
+
+        /**
+         * Constructor.
+         *
+         * @param replicatedOnly Whether all caches are replicated.
+         * @param cacheIds List of cache identifiers.
+         */
+        DistributedPlanInfo(boolean replicatedOnly, List<Integer> cacheIds) {
+            this.replicatedOnly = replicatedOnly;
+            this.cacheIds = cacheIds;
+        }
+
+        /**
+         * @return {@code true} in case all involved caches are replicated.
+         */
+        public boolean isReplicatedOnly() {
+            return replicatedOnly;
+        }
+
+        /**
+         * @return cache identifiers.
+         */
+        public List<Integer> getCacheIds() {
+            return cacheIds;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
index 804f7d8..c845266 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/dml/UpdatePlanBuilder.java
@@ -18,19 +18,26 @@
 package org.apache.ignite.internal.processors.query.h2.dml;
 
 import java.lang.reflect.Constructor;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode;
+import org.apache.ignite.internal.processors.cache.query.SqlFieldsQueryEx;
 import org.apache.ignite.internal.processors.query.GridQueryProperty;
 import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
 import org.apache.ignite.internal.processors.query.IgniteSQLException;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.processors.query.h2.DmlStatementsProcessor;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RowDescriptor;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2Table;
 import org.apache.ignite.internal.processors.query.h2.sql.DmlAstUtils;
@@ -41,12 +48,15 @@ import org.apache.ignite.internal.processors.query.h2.sql.GridSqlInsert;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlMerge;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuery;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQueryParser;
+import org.apache.ignite.internal.processors.query.h2.sql.GridSqlQuerySplitter;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSelect;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlStatement;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlTable;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUnion;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlUpdate;
 import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.h2.command.Prepared;
@@ -71,29 +81,39 @@ public final class UpdatePlanBuilder {
      * if available.
      *
      * @param prepared H2's {@link Prepared}.
+     * @param loc Local query flag.
+     * @param idx Indexing.
+     * @param conn Connection.
+     * @param fieldsQuery Original query.
      * @return Update plan.
      */
-    public static UpdatePlan planForStatement(Prepared prepared,
-        @Nullable Integer errKeysPos) throws IgniteCheckedException {
+    public static UpdatePlan planForStatement(Prepared prepared, boolean loc, IgniteH2Indexing idx,
+        @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer errKeysPos)
+        throws IgniteCheckedException {
         assert !prepared.isQuery();
 
         GridSqlStatement stmt = new GridSqlQueryParser(false).parse(prepared);
 
         if (stmt instanceof GridSqlMerge || stmt instanceof GridSqlInsert)
-            return planForInsert(stmt);
+            return planForInsert(stmt, loc, idx, conn, fieldsQuery);
         else
-            return planForUpdate(stmt, errKeysPos);
+            return planForUpdate(stmt, loc, idx, conn, fieldsQuery, errKeysPos);
     }
 
     /**
      * Prepare update plan for INSERT or MERGE.
      *
      * @param stmt INSERT or MERGE statement.
+     * @param loc Local query flag.
+     * @param idx Indexing.
+     * @param conn Connection.
+     * @param fieldsQuery Original query.
      * @return Update plan.
      * @throws IgniteCheckedException if failed.
      */
     @SuppressWarnings("ConstantConditions")
-    private static UpdatePlan planForInsert(GridSqlStatement stmt) throws IgniteCheckedException {
+    private static UpdatePlan planForInsert(GridSqlStatement stmt, boolean loc, IgniteH2Indexing idx,
+        @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery) throws IgniteCheckedException {
         GridSqlQuery sel;
 
         GridSqlElement target;
@@ -191,23 +211,33 @@ public final class UpdatePlanBuilder {
         KeyValueSupplier keySupplier = createSupplier(cctx, desc.type(), keyColIdx, hasKeyProps, true, false);
         KeyValueSupplier valSupplier = createSupplier(cctx, desc.type(), valColIdx, hasValProps, false, false);
 
+        String selectSql = sel.getSQL();
+
+        UpdatePlan.DistributedPlanInfo distributed = (rowsNum == 0 && !F.isEmpty(selectSql)) ?
+            checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName()) : null;
+
         if (stmt instanceof GridSqlMerge)
             return UpdatePlan.forMerge(tbl.dataTable(), colNames, colTypes, keySupplier, valSupplier, keyColIdx,
-                valColIdx, sel.getSQL(), !isTwoStepSubqry, rowsNum);
+                valColIdx, selectSql, !isTwoStepSubqry, rowsNum, distributed);
         else
             return UpdatePlan.forInsert(tbl.dataTable(), colNames, colTypes, keySupplier, valSupplier, keyColIdx,
-                valColIdx, sel.getSQL(), !isTwoStepSubqry, rowsNum);
+                valColIdx, selectSql, !isTwoStepSubqry, rowsNum, distributed);
     }
 
     /**
      * Prepare update plan for UPDATE or DELETE.
      *
      * @param stmt UPDATE or DELETE statement.
+     * @param loc Local query flag.
+     * @param idx Indexing.
+     * @param conn Connection.
+     * @param fieldsQuery Original query.
      * @param errKeysPos index to inject param for re-run keys at. Null if it's not a re-run plan.
      * @return Update plan.
      * @throws IgniteCheckedException if failed.
      */
-    private static UpdatePlan planForUpdate(GridSqlStatement stmt, @Nullable Integer errKeysPos)
+    private static UpdatePlan planForUpdate(GridSqlStatement stmt, boolean loc, IgniteH2Indexing idx,
+        @Nullable Connection conn, @Nullable SqlFieldsQuery fieldsQuery, @Nullable Integer errKeysPos)
         throws IgniteCheckedException {
         GridSqlElement target;
 
@@ -286,12 +316,23 @@ public final class UpdatePlanBuilder {
 
                 sel = DmlAstUtils.selectForUpdate((GridSqlUpdate) stmt, errKeysPos);
 
-                return UpdatePlan.forUpdate(gridTbl, colNames, colTypes, newValSupplier, valColIdx, sel.getSQL());
+                String selectSql = sel.getSQL();
+
+                UpdatePlan.DistributedPlanInfo distributed = F.isEmpty(selectSql) ? null :
+                    checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName());
+
+                return UpdatePlan.forUpdate(gridTbl, colNames, colTypes, newValSupplier, valColIdx, selectSql,
+                    distributed);
             }
             else {
                 sel = DmlAstUtils.selectForDelete((GridSqlDelete) stmt, errKeysPos);
 
-                return UpdatePlan.forDelete(gridTbl, sel.getSQL());
+                String selectSql = sel.getSQL();
+
+                UpdatePlan.DistributedPlanInfo distributed = F.isEmpty(selectSql) ? null :
+                    checkPlanCanBeDistributed(idx, conn, fieldsQuery, loc, selectSql, tbl.dataTable().cacheName());
+
+                return UpdatePlan.forDelete(gridTbl, selectSql, distributed);
             }
         }
     }
@@ -494,6 +535,62 @@ public final class UpdatePlanBuilder {
     }
 
     /**
+     * Checks whether the given update plan can be distributed and returns additional info.
+     *
+     * @param idx Indexing.
+     * @param conn Connection.
+     * @param fieldsQry Initial update query.
+     * @param loc Local query flag.
+     * @param selectQry Derived select query.
+     * @param cacheName Cache name.
+     * @return distributed update plan info, or {@code null} if cannot be distributed.
+     * @throws IgniteCheckedException if failed.
+     */
+    private static UpdatePlan.DistributedPlanInfo checkPlanCanBeDistributed(IgniteH2Indexing idx,
+        Connection conn, SqlFieldsQuery fieldsQry, boolean loc, String selectQry, String cacheName)
+        throws IgniteCheckedException {
+
+        if (loc || !isSkipReducerOnUpdateQuery(fieldsQry))
+            return null;
+
+        assert conn != null;
+
+        try {
+            // Get a new prepared statement for derived select query.
+            try (PreparedStatement stmt = conn.prepareStatement(selectQry)) {
+                idx.bindParameters(stmt, F.asList(fieldsQry.getArgs()));
+
+                GridCacheTwoStepQuery qry = GridSqlQuerySplitter.split(conn,
+                    GridSqlQueryParser.prepared(stmt),
+                    fieldsQry.getArgs(),
+                    fieldsQry.isCollocated(),
+                    fieldsQry.isDistributedJoins(),
+                    fieldsQry.isEnforceJoinOrder(), idx);
+
+                boolean distributed = qry.skipMergeTable() &&  qry.mapQueries().size() == 1 &&
+                    !qry.mapQueries().get(0).hasSubQueries();
+
+                return distributed ? new UpdatePlan.DistributedPlanInfo(qry.isReplicatedOnly(),
+                    idx.collectCacheIds(CU.cacheId(cacheName), qry)): null;
+            }
+        }
+        catch (SQLException e) {
+            throw new IgniteCheckedException(e);
+        }
+    }
+
+    /**
+     * Checks whether query flags are compatible with server side update.
+     *
+     * @param qry Query.
+     * @return {@code true} if update can be distributed.
+     */
+    public static boolean isSkipReducerOnUpdateQuery(SqlFieldsQuery qry) {
+        return qry != null && !qry.isLocal() &&
+            qry instanceof SqlFieldsQueryEx && ((SqlFieldsQueryEx)qry).isSkipReducerOnUpdate();
+    }
+
+    /**
      * Simple supplier that just takes specified element of a given row.
      */
     private final static class PlainValueSupplier implements KeyValueSupplier {

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 7f28203..c96b486 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -1509,6 +1509,19 @@ public class GridSqlQuerySplitter {
             rdcQry.distinct(true);
         }
 
+        // -- SUB-QUERIES
+        boolean hasSubQueries = hasSubQueries(mapQry.where()) || hasSubQueries(mapQry.from());
+
+        if (!hasSubQueries) {
+            for (int i = 0; i < mapQry.columns(false).size(); i++) {
+                if (hasSubQueries(mapQry.column(i))) {
+                    hasSubQueries = true;
+
+                    break;
+                }
+            }
+        }
+
         // Replace the given select with generated reduce query in the parent.
         prnt.child(childIdx, rdcQry);
 
@@ -1519,6 +1532,7 @@ public class GridSqlQuerySplitter {
         map.columns(collectColumns(mapExps));
         map.sortColumns(mapQry.sort());
         map.partitioned(hasPartitionedTables(mapQry));
+        map.hasSubQueries(hasSubQueries);
 
         if (map.isPartitioned())
             map.derivedPartitions(derivePartitionsFromQuery(mapQry, ctx));
@@ -1543,6 +1557,25 @@ public class GridSqlQuerySplitter {
     }
 
     /**
+     * @param ast Map query AST.
+     * @return {@code true} If the given AST has sub-queries.
+     */
+    private boolean hasSubQueries(GridSqlAst ast) {
+        if (ast == null)
+            return false;
+
+        if (ast instanceof GridSqlSubquery)
+            return true;
+
+        for (int childIdx = 0; childIdx < ast.size(); childIdx++) {
+            if (hasSubQueries(ast.child(childIdx)))
+                return true;
+        }
+
+        return false;
+    }
+
+    /**
      * @param sqlQry Query.
      * @param qryAst Select AST.
      * @param params All parameters.

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
new file mode 100644
index 0000000..a783b8a
--- /dev/null
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/DistributedUpdateRun.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.UUID;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import org.apache.ignite.internal.processors.query.h2.UpdateResult;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+
+/**
+ * Context for DML operation on reducer node.
+ */
+class DistributedUpdateRun {
+    /** Expected number of responses. */
+    private final int nodeCount;
+
+    /** Registers nodes that have responded. */
+    private final HashSet<UUID> rspNodes;
+
+    /** Accumulates total number of updated rows. */
+    private long updCntr = 0L;
+
+    /** Accumulates error keys. */
+    private HashSet<Object> errorKeys;
+
+    /** Query info. */
+    private final GridRunningQueryInfo qry;
+
+    /** Result future. */
+    private final GridFutureAdapter<UpdateResult> fut = new GridFutureAdapter<>();
+
+    /**
+     * Constructor.
+     *
+     * @param nodeCount Number of nodes to await results from.
+     * @param qry Query info.
+     */
+    DistributedUpdateRun(int nodeCount, GridRunningQueryInfo qry) {
+        this.nodeCount = nodeCount;
+        this.qry = qry;
+
+        rspNodes = new HashSet<>(nodeCount);
+    }
+
+    /**
+     * @return Query info.
+     */
+    GridRunningQueryInfo queryInfo() {
+        return qry;
+    }
+
+    /**
+     * @return Result future.
+     */
+    GridFutureAdapter<UpdateResult> future() {
+        return fut;
+    }
+
+    /**
+     * Handle disconnection.
+     * @param e Pre-formatted error.
+     */
+    void handleDisconnect(CacheException e) {
+        fut.onDone(new IgniteCheckedException("Update failed because client node have disconnected.", e));
+    }
+
+    /**
+     * Handle leave of a node.
+     *
+     * @param nodeId Node id.
+     */
+    void handleNodeLeft(UUID nodeId) {
+        fut.onDone(new IgniteCheckedException("Update failed because map node left topology [nodeId=" + nodeId + "]"));
+    }
+
+    /**
+     * Handle response from remote node.
+     *
+     * @param id Node id.
+     * @param msg Response message.
+     */
+    void handleResponse(UUID id, GridH2DmlResponse msg) {
+        synchronized (this) {
+            if (!rspNodes.add(id))
+                return; // ignore duplicated messages
+
+            String err = msg.error();
+
+            if (err != null) {
+                fut.onDone(new IgniteCheckedException("Update failed. " + (F.isEmpty(err) ? "" : err) + "[reqId=" +
+                    msg.requestId() + ", node=" + id + "]."));
+
+                return;
+            }
+
+            if (!F.isEmpty(msg.errorKeys())) {
+                List<Object> errList = Arrays.asList(msg.errorKeys());
+
+                if (errorKeys == null)
+                    errorKeys = new HashSet<>(errList);
+                else
+                    errorKeys.addAll(errList);
+            }
+
+            updCntr += msg.updateCounter();
+
+            if (rspNodes.size() == nodeCount)
+                fut.onDone(new UpdateResult(updCntr, errorKeys == null ? null : errorKeys.toArray()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 0cc4172..77b928f 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -21,6 +21,7 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.util.AbstractCollection;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
@@ -30,12 +31,14 @@ import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.CacheException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.query.QueryCancelledException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheQueryExecutedEvent;
 import org.apache.ignite.events.DiscoveryEvent;
@@ -54,8 +57,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservabl
 import org.apache.ignite.internal.processors.cache.query.CacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.UpdateResult;
 import org.apache.ignite.internal.processors.query.h2.opt.DistributedJoinMode;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
@@ -63,6 +68,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.typedef.CI1;
@@ -71,6 +78,7 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.thread.IgniteThread;
+import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.h2.jdbc.JdbcResultSet;
 import org.h2.value.Value;
 import org.jetbrains.annotations.Nullable;
@@ -208,6 +216,8 @@ public class GridMapQueryExecutor {
                 onNextPageRequest(node, (GridQueryNextPageRequest)msg);
             else if (msg instanceof GridQueryCancelRequest)
                 onCancel(node, (GridQueryCancelRequest)msg);
+            else if (msg instanceof GridH2DmlRequest)
+                onDmlRequest(node, (GridH2DmlRequest)msg);
             else
                 processed = false;
 
@@ -735,6 +745,102 @@ public class GridMapQueryExecutor {
 
     /**
      * @param node Node.
+     * @param req DML request.
+     */
+    private void onDmlRequest(final ClusterNode node, final GridH2DmlRequest req) throws IgniteCheckedException {
+        int[] parts = req.queryPartitions();
+
+        List<Integer> cacheIds = req.caches();
+
+        long reqId = req.requestId();
+
+        AffinityTopologyVersion topVer = req.topologyVersion();
+
+        List<GridReservable> reserved = new ArrayList<>();
+
+        if (!reservePartitions(cacheIds, topVer, parts, reserved)) {
+            U.error(log, "Failed to reserve partitions for DML request. [localNodeId=" + ctx.localNodeId() +
+                ", nodeId=" + node.id() + ", reqId=" + req.requestId() + ", cacheIds=" + cacheIds +
+                ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']');
+
+            sendUpdateResponse(node, reqId, null, "Failed to reserve partitions for DML request. " +
+                "Explanation (Retry your request when re-balancing is over).");
+
+            return;
+        }
+
+        MapNodeResults nodeResults = resultsForNode(node.id());
+
+        try {
+            IndexingQueryFilter filter = h2.backupFilter(topVer, parts);
+
+            GridQueryCancel cancel = nodeResults.putUpdate(reqId);
+
+            SqlFieldsQuery fldsQry = new SqlFieldsQuery(req.query());
+
+            if (req.parameters() != null)
+                fldsQry.setArgs(req.parameters());
+
+            fldsQry.setEnforceJoinOrder(req.isFlagSet(GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER));
+            fldsQry.setTimeout(req.timeout(), TimeUnit.MILLISECONDS);
+            fldsQry.setPageSize(req.pageSize());
+            fldsQry.setLocal(true);
+
+            boolean local = true;
+
+            final boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
+
+            if (!replicated && !F.isEmpty(cacheIds) &&
+                findFirstPartitioned(cacheIds).config().getQueryParallelism() > 1) {
+                fldsQry.setDistributedJoins(true);
+
+                local = false;
+            }
+
+            UpdateResult updRes = h2.mapDistributedUpdate(req.schemaName(), fldsQry, filter, cancel, local);
+
+            GridCacheContext<?, ?> mainCctx =
+                !F.isEmpty(cacheIds) ? ctx.cache().context().cacheContext(cacheIds.get(0)) : null;
+
+            boolean evt = local && mainCctx != null && ctx.event().isRecordable(EVT_CACHE_QUERY_EXECUTED);
+
+            if (evt) {
+                ctx.event().record(new CacheQueryExecutedEvent<>(
+                    node,
+                    "SQL query executed.",
+                    EVT_CACHE_QUERY_EXECUTED,
+                    CacheQueryType.SQL.name(),
+                    mainCctx.name(),
+                    null,
+                    req.query(),
+                    null,
+                    null,
+                    req.parameters(),
+                    node.id(),
+                    null));
+            }
+
+            sendUpdateResponse(node, reqId, updRes, null);
+        }
+        catch (Exception e) {
+            U.error(log, "Error processing dml request. [localNodeId=" + ctx.localNodeId() +
+                ", nodeId=" + node.id() + ", req=" + req + ']', e);
+
+            sendUpdateResponse(node, reqId, null, e.getMessage());
+        }
+        finally {
+            if (!F.isEmpty(reserved)) {
+                // Release reserved partitions.
+                for (int i = 0; i < reserved.size(); i++)
+                    reserved.get(i).release();
+            }
+
+            nodeResults.removeUpdate(reqId);
+        }
+    }
+
+    /**
+     * @param node Node.
      * @param qryReqId Query request ID.
      * @param err Error.
      */
@@ -758,6 +864,36 @@ public class GridMapQueryExecutor {
     }
 
     /**
+     * Sends update response for DML request.
+     *
+     * @param node Node.
+     * @param reqId Request id.
+     * @param updResult Update result.
+     * @param error Error message.
+     */
+    @SuppressWarnings("deprecation")
+    private void sendUpdateResponse(ClusterNode node, long reqId, UpdateResult updResult, String error) {
+        try {
+            GridH2DmlResponse rsp = new GridH2DmlResponse(reqId, updResult == null ? 0 : updResult.counter(),
+                updResult == null ? null : updResult.errorKeys(), error);
+
+            if (log.isDebugEnabled())
+                log.debug("Sending: [localNodeId=" + ctx.localNodeId() + ", node=" + node.id() + ", msg=" + rsp + "]");
+
+            if (node.isLocal())
+                h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), rsp);
+            else {
+                rsp.marshall(ctx.config().getMarshaller());
+
+                ctx.io().sendToGridTopic(node, GridTopic.TOPIC_QUERY, rsp, QUERY_POOL);
+            }
+        }
+        catch (Exception e) {
+            U.error(log, "Failed to send message.", e);
+        }
+    }
+
+    /**
      * @param node Node.
      * @param req Request.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 8638794..f85cd94 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -59,6 +60,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
+import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCacheObjectsIterator;
@@ -67,6 +69,7 @@ import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
 import org.apache.ignite.internal.processors.query.h2.H2FieldsIterator;
 import org.apache.ignite.internal.processors.query.h2.H2Utils;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.UpdateResult;
 import org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryContext;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlSortColumn;
 import org.apache.ignite.internal.processors.query.h2.sql.GridSqlType;
@@ -74,6 +77,8 @@ import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQuery
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryFailResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageRequest;
 import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryNextPageResponse;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2DmlResponse;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
 import org.apache.ignite.internal.util.GridIntIterator;
 import org.apache.ignite.internal.util.GridIntList;
@@ -130,6 +135,9 @@ public class GridReduceQueryExecutor {
     /** */
     private final ConcurrentMap<Long, ReduceQueryRun> runs = new ConcurrentHashMap8<>();
 
+    /** Contexts of running DML requests. */
+    private final ConcurrentMap<Long, DistributedUpdateRun> updRuns = new ConcurrentHashMap<>();
+
     /** */
     private volatile List<GridThreadLocalTable> fakeTbls = Collections.emptyList();
 
@@ -197,6 +205,10 @@ public class GridReduceQueryExecutor {
                         }
                     }
                 }
+
+                for (DistributedUpdateRun r : updRuns.values())
+                    r.handleNodeLeft(nodeId);
+
             }
         }, EventType.EVT_NODE_FAILED, EventType.EVT_NODE_LEFT);
     }
@@ -229,6 +241,8 @@ public class GridReduceQueryExecutor {
                 onNextPage(node, (GridQueryNextPageResponse)msg);
             else if (msg instanceof GridQueryFailResponse)
                 onFail(node, (GridQueryFailResponse)msg);
+            else if (msg instanceof GridH2DmlResponse)
+                onDmlResponse(node, (GridH2DmlResponse)msg);
             else
                 processed = false;
 
@@ -575,25 +589,11 @@ public class GridReduceQueryExecutor {
             if (qry.isLocal())
                 nodes = singletonList(ctx.discovery().localNode());
             else {
-                if (isPreloadingActive(cacheIds)) {
-                    if (isReplicatedOnly)
-                        nodes = replicatedUnstableDataNodes(cacheIds);
-                    else {
-                        partsMap = partitionedUnstableDataNodes(cacheIds);
-
-                        if (partsMap != null) {
-                            qryMap = narrowForQuery(partsMap, parts);
-
-                            nodes = qryMap == null ? null : qryMap.keySet();
-                        }
-                    }
-                }
-                else {
-                    qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts);
+                NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly);
 
-                    if (qryMap != null)
-                        nodes = qryMap.keySet();
-                }
+                nodes = nodesParts.nodes();
+                partsMap = nodesParts.partitionsMap();
+                qryMap = nodesParts.queryPartitionsMap();
 
                 if (nodes == null)
                     continue; // Retry.
@@ -845,6 +845,153 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     *
+     * @param schemaName Schema name.
+     * @param cacheIds Cache ids.
+     * @param selectQry Select query.
+     * @param params SQL parameters.
+     * @param enforceJoinOrder Enforce join order of tables.
+     * @param pageSize Page size.
+     * @param timeoutMillis Timeout.
+     * @param parts Partitions.
+     * @param isReplicatedOnly Whether query uses only replicated caches.
+     * @param cancel Cancel state.
+     * @return Update result, or {@code null} when some map node doesn't support distributed DML.
+     */
+    public UpdateResult update(
+        String schemaName,
+        List<Integer> cacheIds,
+        String selectQry,
+        Object[] params,
+        boolean enforceJoinOrder,
+        int pageSize,
+        int timeoutMillis,
+        final int[] parts,
+        boolean isReplicatedOnly,
+        GridQueryCancel cancel
+    ) {
+        AffinityTopologyVersion topVer = h2.readyTopologyVersion();
+
+        NodesForPartitionsResult nodesParts = nodesForPartitions(cacheIds, topVer, parts, isReplicatedOnly);
+
+        final long reqId = qryIdGen.incrementAndGet();
+
+        final GridRunningQueryInfo qryInfo = new GridRunningQueryInfo(reqId, selectQry, GridCacheQueryType.SQL_FIELDS,
+            schemaName, U.currentTimeMillis(), cancel, false);
+
+        Collection<ClusterNode> nodes = nodesParts.nodes();
+
+        if (nodes == null)
+            throw new CacheException("Failed to determine nodes participating in the update. " +
+                "Explanation (Retry update once topology recovers).");
+
+        if (isReplicatedOnly) {
+            ClusterNode locNode = ctx.discovery().localNode();
+
+            if (nodes.contains(locNode))
+                nodes = singletonList(locNode);
+            else
+                nodes = singletonList(F.rand(nodes));
+        }
+
+        for (ClusterNode n : nodes) {
+            if (!n.version().greaterThanEqual(2, 3, 0)) {
+                log.warning("Server-side DML optimization is skipped because map node does not support it. " +
+                    "Falling back to normal DML. [node=" + n.id() + ", v=" + n.version() + "].");
+
+                return null;
+            }
+        }
+
+        final DistributedUpdateRun r = new DistributedUpdateRun(nodes.size(), qryInfo);
+
+        int flags = enforceJoinOrder ? GridH2QueryRequest.FLAG_ENFORCE_JOIN_ORDER : 0;
+
+        if (isReplicatedOnly)
+            flags |= GridH2QueryRequest.FLAG_REPLICATED;
+
+        GridH2DmlRequest req = new GridH2DmlRequest()
+            .requestId(reqId)
+            .topologyVersion(topVer)
+            .caches(cacheIds)
+            .schemaName(schemaName)
+            .query(selectQry)
+            .pageSize(pageSize)
+            .parameters(params)
+            .timeout(timeoutMillis)
+            .flags(flags);
+
+        updRuns.put(reqId, r);
+
+        boolean release = false;
+
+        try {
+            Map<ClusterNode, IntArray> partsMap = (nodesParts.queryPartitionsMap() != null) ?
+                nodesParts.queryPartitionsMap() : nodesParts.partitionsMap();
+
+            ExplicitPartitionsSpecializer partsSpec = (parts == null) ? null :
+                new ExplicitPartitionsSpecializer(partsMap);
+
+            final Collection<ClusterNode> finalNodes = nodes;
+
+            cancel.set(new Runnable() {
+                @Override public void run() {
+                    r.future().onCancelled();
+
+                    send(finalNodes, new GridQueryCancelRequest(reqId), null, false);
+                }
+            });
+
+            // send() logs the debug message
+            if (send(nodes, req, partsSpec, false))
+                return r.future().get();
+
+            throw new CacheException("Failed to send update request to participating nodes.");
+        }
+        catch (IgniteCheckedException | RuntimeException e) {
+            release = true;
+
+            U.error(log, "Error during update [localNodeId=" + ctx.localNodeId() + "]", e);
+
+            throw new CacheException("Failed to run update. " + e.getMessage(), e);
+        }
+        finally {
+            if (release)
+                send(nodes, new GridQueryCancelRequest(reqId), null, false);
+
+            if (!updRuns.remove(reqId, r))
+                U.warn(log, "Update run was already removed: " + reqId);
+        }
+    }
+
+    /**
+     * Process response for DML request.
+     *
+     * @param node Node.
+     * @param msg Message.
+     */
+    private void onDmlResponse(final ClusterNode node, GridH2DmlResponse msg) {
+        try {
+            long reqId = msg.requestId();
+
+            DistributedUpdateRun r = updRuns.get(reqId);
+
+            if (r == null) {
+                U.warn(log, "Unexpected dml response (will ignore). [localNodeId=" + ctx.localNodeId() + ", nodeId=" +
+                    node.id() + ", msg=" + msg.toString() + ']');
+
+                return;
+            }
+
+            r.handleResponse(node.id(), msg);
+        }
+        catch (Exception e) {
+            U.error(log, "Error in dml response processing. [localNodeId=" + ctx.localNodeId() + ", nodeId=" +
+                node.id() + ", msg=" + msg.toString() + ']', e);
+        }
+    }
+
+    /**
      * @param cacheIds Cache IDs.
      * @return The first partitioned cache context.
      */
@@ -1309,6 +1456,44 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * Evaluates nodes and nodes to partitions map given a list of cache ids, topology version and partitions.
+     *
+     * @param cacheIds Cache ids.
+     * @param topVer Topology version.
+     * @param parts Partitions array.
+     * @param isReplicatedOnly Allow only replicated caches.
+     * @return Result.
+     */
+    private NodesForPartitionsResult nodesForPartitions(List<Integer> cacheIds, AffinityTopologyVersion topVer,
+        int[] parts, boolean isReplicatedOnly) {
+        Collection<ClusterNode> nodes = null;
+        Map<ClusterNode, IntArray> partsMap = null;
+        Map<ClusterNode, IntArray> qryMap = null;
+
+        if (isPreloadingActive(cacheIds)) {
+            if (isReplicatedOnly)
+                nodes = replicatedUnstableDataNodes(cacheIds);
+            else {
+                partsMap = partitionedUnstableDataNodes(cacheIds);
+
+                if (partsMap != null) {
+                    qryMap = narrowForQuery(partsMap, parts);
+
+                    nodes = qryMap == null ? null : qryMap.keySet();
+                }
+            }
+        }
+        else {
+            qryMap = stableDataNodes(isReplicatedOnly, topVer, cacheIds, parts);
+
+            if (qryMap != null)
+                nodes = qryMap.keySet();
+        }
+
+        return new NodesForPartitionsResult(nodes, partsMap, qryMap);
+    }
+
+    /**
      * @param conn Connection.
      * @param qry Query.
      * @param explain Explain.
@@ -1403,6 +1588,9 @@ public class GridReduceQueryExecutor {
 
         for (Map.Entry<Long, ReduceQueryRun> e : runs.entrySet())
             e.getValue().disconnected(err);
+
+        for (DistributedUpdateRun r: updRuns.values())
+            r.handleDisconnect(err);
     }
 
     /**
@@ -1421,6 +1609,11 @@ public class GridReduceQueryExecutor {
                 res.add(run.queryInfo());
         }
 
+        for (DistributedUpdateRun upd: updRuns.values()) {
+            if (upd.queryInfo().longQuery(curTime, duration))
+                res.add(upd.queryInfo());
+        }
+
         return res;
     }
 
@@ -1435,6 +1628,12 @@ public class GridReduceQueryExecutor {
 
             if (run != null)
                 run.queryInfo().cancel();
+            else {
+                DistributedUpdateRun upd = updRuns.get(qryId);
+
+                if (upd != null)
+                    upd.queryInfo().cancel();
+            }
         }
     }
 
@@ -1478,11 +1677,64 @@ public class GridReduceQueryExecutor {
 
         /** {@inheritDoc} */
         @Override public Message apply(ClusterNode node, Message msg) {
-            GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg);
+            if (msg instanceof GridH2QueryRequest) {
+                GridH2QueryRequest rq = new GridH2QueryRequest((GridH2QueryRequest)msg);
+
+                rq.queryPartitions(toArray(partsMap.get(node)));
+
+                return rq;
+            } else if (msg instanceof GridH2DmlRequest) {
+                GridH2DmlRequest rq = new GridH2DmlRequest((GridH2DmlRequest)msg);
+
+                rq.queryPartitions(toArray(partsMap.get(node)));
+
+                return rq;
+            }
+
+            return msg;
+        }
+    }
+
+    /**
+     * Result of nodes to partitions mapping for a query or update.
+     */
+    static class NodesForPartitionsResult {
+        /** */
+        final Collection<ClusterNode> nodes;
 
-            rq.queryPartitions(toArray(partsMap.get(node)));
+        /** */
+        final Map<ClusterNode, IntArray> partsMap;
 
-            return rq;
+        /** */
+        final Map<ClusterNode, IntArray> qryMap;
+
+        /** */
+        NodesForPartitionsResult(Collection<ClusterNode> nodes, Map<ClusterNode, IntArray> partsMap,
+            Map<ClusterNode, IntArray> qryMap) {
+            this.nodes = nodes;
+            this.partsMap = partsMap;
+            this.qryMap = qryMap;
+        }
+
+        /**
+         * @return Collection of nodes a message shall be sent to.
+         */
+        Collection<ClusterNode> nodes() {
+            return nodes;
+        }
+
+        /**
+         * @return Maps a node to partition array.
+         */
+        Map<ClusterNode, IntArray> partitionsMap() {
+            return partsMap;
+        }
+
+        /**
+         * @return Maps a node to partition array.
+         */
+        Map<ClusterNode, IntArray> queryPartitionsMap() {
+            return qryMap;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/ae02a1d3/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
index 2d20c8d..c0637ea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/MapNodeResults.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
+import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -32,6 +33,9 @@ class MapNodeResults {
     /** */
     private final ConcurrentMap<MapRequestKey, MapQueryResults> res = new ConcurrentHashMap8<>();
 
+    /** Cancel state for update requests. */
+    private final ConcurrentMap<Long, GridQueryCancel> updCancels = new ConcurrentHashMap8<>();
+
     /** */
     private final GridBoundedConcurrentLinkedHashMap<Long, Boolean> qryHist =
         new GridBoundedConcurrentLinkedHashMap<>(1024, 1024, 0.75f, 64, PER_SEGMENT_Q);
@@ -88,6 +92,12 @@ class MapNodeResults {
                     removed.cancel(true);
             }
         }
+
+        // Cancel update request
+        GridQueryCancel updCancel = updCancels.remove(reqId);
+
+        if (updCancel != null)
+            updCancel.cancel();
     }
 
     /**
@@ -111,11 +121,34 @@ class MapNodeResults {
     }
 
     /**
+     * @param reqId Request id.
+     * @return Cancel state.
+     */
+    public GridQueryCancel putUpdate(long reqId) {
+        GridQueryCancel cancel = new GridQueryCancel();
+
+        updCancels.put(reqId, cancel);
+
+        return cancel;
+    }
+
+    /**
+     * @param reqId Request id.
+     */
+    public void removeUpdate(long reqId) {
+        updCancels.remove(reqId);
+    }
+
+    /**
      * Cancel all node queries.
      */
     public void cancelAll() {
         for (MapQueryResults ress : res.values())
             ress.cancel(true);
+
+        // Cancel update requests
+        for (GridQueryCancel upd: updCancels.values())
+            upd.cancel();
     }
 
 }


Mime
View raw message