ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject ignite git commit: IGNITE-9141: SQL: pass error message from mapper to reducer in case of mapping failure. This closes #4536.
Date Tue, 04 Sep 2018 08:57:16 GMT
Repository: ignite
Updated Branches:
  refs/heads/master 08c563a14 -> 525a77547


IGNITE-9141: SQL: pass error message from mapper to reducer in case of mapping failure. This closes #4536.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/525a7754
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/525a7754
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/525a7754

Branch: refs/heads/master
Commit: 525a77547d7f98b1a88ba98f60f328da3aa4947a
Parents: 08c563a
Author: SGrimstad <sgrimstad@gridgain.com>
Authored: Tue Sep 4 11:57:07 2018 +0300
Committer: devozerov <vozerov@gridgain.com>
Committed: Tue Sep 4 11:57:07 2018 +0300

----------------------------------------------------------------------
 .../messages/GridQueryNextPageResponse.java     |  35 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  | 131 +++---
 .../h2/twostep/GridReduceQueryExecutor.java     |  75 ++--
 .../query/h2/twostep/GridResultPage.java        |   2 -
 .../query/h2/twostep/ReduceQueryRun.java        | 117 +++++-
 ...sappearedCacheCauseRetryMessageSelfTest.java | 134 ++++++
 ...appearedCacheWasNotFoundMessageSelfTest.java | 123 ++++++
 .../query/h2/twostep/JoinSqlTestHelper.java     | 163 ++++++++
 .../NonCollocatedRetryMessageSelfTest.java      | 146 +++++++
 .../h2/twostep/RetryCauseMessageSelfTest.java   | 416 +++++++++++++++++++
 .../IgniteCacheQuerySelfTestSuite2.java         |   9 +
 11 files changed, 1236 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
index 2d1ec36..6b976c2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryNextPageResponse.java
@@ -67,6 +67,9 @@ public class GridQueryNextPageResponse implements Message {
     /** */
     private AffinityTopologyVersion retry;
 
+    /** Retry cause description*/
+    private String retryCause;
+
     /** Last page flag. */
     private boolean last;
 
@@ -235,6 +238,12 @@ public class GridQueryNextPageResponse implements Message {
                 writer.incrementState();
 
             case 9:
+                if (!writer.writeString("retryCause", retryCause))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
                 if (!writer.writeBoolean("removeMapping", removeMapping))
                     return false;
 
@@ -323,13 +332,23 @@ public class GridQueryNextPageResponse implements Message {
                     return false;
 
                 reader.incrementState();
+
             case 9:
+                retryCause = reader.readString("retryCause");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
                 removeMapping = reader.readBoolean("removeMapping");
 
                 if (!reader.isLastRead())
                     return false;
 
                 reader.incrementState();
+
         }
 
         return reader.afterMessageRead(GridQueryNextPageResponse.class);
@@ -342,7 +361,7 @@ public class GridQueryNextPageResponse implements Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 11;
     }
 
     /**
@@ -360,6 +379,20 @@ public class GridQueryNextPageResponse implements Message {
     }
 
     /**
+     * @return Retry Ccause message.
+     */
+    public String retryCause() {
+        return retryCause;
+    }
+
+    /**
+     * @param retryCause Retry Ccause message.
+     */
+    public void retryCause(String retryCause){
+        this.retryCause = retryCause;
+    }
+
+    /**
      * @return Last page flag.
      */
     public boolean last() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 2402247..b4d6f00 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -312,10 +312,10 @@ public class GridMapQueryExecutor {
      * @param reserved Reserved list.
      * @param nodeId Node ID.
      * @param reqId Request ID.
-     * @return {@code true} If all the needed partitions successfully reserved.
+     * @return String which is null in case of success or with causeMessage if failed
      * @throws IgniteCheckedException If failed.
      */
-    private boolean reservePartitions(
+    private String reservePartitions(
         @Nullable List<Integer> cacheIds,
         AffinityTopologyVersion topVer,
         final int[] explicitParts,
@@ -326,7 +326,7 @@ public class GridMapQueryExecutor {
         assert topVer != null;
 
         if (F.isEmpty(cacheIds))
-            return true;
+            return null;
 
         Collection<Integer> partIds = wrap(explicitParts);
 
@@ -335,11 +335,11 @@ public class GridMapQueryExecutor {
 
             // Cache was not found, probably was not deployed yet.
             if (cctx == null) {
-                logRetry("Failed to reserve partitions for query (cache is not found on local node) [" +
-                    "rmtNodeId=" + nodeId + ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" +
-                    cacheIds.get(i) + "]");
+                final String res = String.format("Failed to reserve partitions for query (cache is not found on " +
+                    "local node) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s]",
+                    ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i));
 
-                return false;
+                return res;
             }
 
             if (cctx.isLocal() || !cctx.rebalanceEnabled())
@@ -352,13 +352,10 @@ public class GridMapQueryExecutor {
 
             if (explicitParts == null && r != null) { // Try to reserve group partition if any and no explicits.
                 if (r != MapReplicatedReservation.INSTANCE) {
-                    if (!r.reserve()) {
-                        logRetry("Failed to reserve partitions for query (group reservation failed) [" +
-                            "rmtNodeId=" + nodeId + ", reqId=" + reqId + ", affTopVer=" + topVer +
-                            ", cacheId=" + cacheIds.get(i) + ", cacheName=" + cctx.name() + "]");
-
-                        return false; // We need explicit partitions here -> retry.
-                    }
+                    if (!r.reserve())
+                        return String.format("Failed to reserve partitions for query (group " +
+                            "reservation failed) [localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, " +
+                            "cacheName=%s]",ctx.localNodeId(), nodeId, reqId, topVer, cacheIds.get(i), cctx.name());
 
                     reserved.add(r);
                 }
@@ -374,15 +371,21 @@ public class GridMapQueryExecutor {
                             // We don't need to reserve partitions because they will not be evicted in replicated caches.
                             GridDhtPartitionState partState = part != null ? part.state() : null;
 
-                            if (partState != OWNING) {
-                                logRetry("Failed to reserve partitions for query (partition of " +
-                                    "REPLICATED cache is not in OWNING state) [rmtNodeId=" + nodeId +
-                                    ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) +
-                                    ", cacheName=" + cctx.name() + ", part=" + p + ", partFound=" + (part != null) +
-                                    ", partState=" + partState + "]");
-
-                                return false;
-                            }
+                            if (partState != OWNING)
+                                return String.format("Failed to reserve partitions for query " +
+                                    "(partition of REPLICATED cache is not in OWNING state) [" +
+                                    "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, cacheName=%s, " +
+                                    "part=%s, partFound=%s, partState=%s]",
+                                    ctx.localNodeId(),
+                                    nodeId,
+                                    reqId,
+                                    topVer,
+                                    cacheIds.get(i),
+                                    cctx.name(),
+                                    p,
+                                    (part != null),
+                                    partState
+                                );
                         }
 
                         // Mark that we checked this replicated cache.
@@ -398,29 +401,41 @@ public class GridMapQueryExecutor {
 
                         GridDhtPartitionState partState = part != null ? part.state() : null;
 
-                        if (partState != OWNING || !part.reserve()) {
-                            logRetry("Failed to reserve partitions for query (partition of " +
-                                "PARTITIONED cache cannot be reserved) [rmtNodeId=" + nodeId + ", reqId=" + reqId +
-                                ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) +
-                                ", cacheName=" + cctx.name() + ", part=" + partId + ", partFound=" + (part != null) +
-                                ", partState=" + partState + "]");
-
-                            return false;
-                        }
+                        if (partState != OWNING || !part.reserve())
+                            return String.format("Failed to reserve partitions for query " +
+                                "(partition of PARTITIONED cache cannot be reserved) [" +
+                                "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, cacheName=%s, " +
+                                "part=%s, partFound=%s, partState=%s]",
+                                ctx.localNodeId(),
+                                nodeId,
+                                reqId,
+                                topVer,
+                                cacheIds.get(i),
+                                cctx.name(),
+                                partId,
+                                (part != null),
+                                partState
+                            );
 
                         reserved.add(part);
 
                         // Double check that we are still in owning state and partition contents are not cleared.
                         partState = part.state();
 
-                        if (part.state() != OWNING) {
-                            logRetry("Failed to reserve partitions for query (partition of " +
-                                "PARTITIONED cache is not in OWNING state after reservation) [rmtNodeId=" + nodeId +
-                                ", reqId=" + reqId + ", affTopVer=" + topVer + ", cacheId=" + cacheIds.get(i) +
-                                ", cacheName=" + cctx.name() + ", part=" + partId + ", partState=" + partState + "]");
-
-                            return false;
-                        }
+                        if (part.state() != OWNING)
+                            return String.format("Failed to reserve partitions for query " +
+                                "(partition of PARTITIONED cache is not in OWNING state after reservation) [" +
+                                "localNodeId=%s, rmtNodeId=%s, reqId=%s, affTopVer=%s, cacheId=%s, cacheName=%s, " +
+                                "part=%s, partState=%s]",
+                                ctx.localNodeId(),
+                                nodeId,
+                                reqId,
+                                topVer,
+                                cacheIds.get(i),
+                                cctx.name(),
+                                partId,
+                                partState
+                            );
                     }
 
                     if (explicitParts == null) {
@@ -442,16 +457,7 @@ public class GridMapQueryExecutor {
             }
         }
 
-        return true;
-    }
-
-    /**
-     * Load failed partition reservation.
-     *
-     * @param msg Message.
-     */
-    private void logRetry(String msg) {
-        log.info(msg);
+        return null;
     }
 
     /**
@@ -783,12 +789,14 @@ public class GridMapQueryExecutor {
             // otherwise, their state is protected by locked topology.
             if (topVer != null && txDetails == null) {
                 // Reserve primary for topology version or explicit partitions.
-                if (!reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId)) {
+                String err = reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId);
+
+                if (!F.isEmpty(err)) {
                     // Unregister lazy worker because re-try may never reach this node again.
                     if (lazy)
                         stopAndUnregisterCurrentLazyWorker();
 
-                    sendRetry(node, reqId, segmentId);
+                    sendRetry(node, reqId, segmentId, err);
 
                     return;
                 }
@@ -975,10 +983,12 @@ public class GridMapQueryExecutor {
             GridH2RetryException retryErr = X.cause(e, GridH2RetryException.class);
 
             if (retryErr != null) {
-                logRetry("Failed to execute non-collocated query (will retry) [nodeId=" + node.id() +
-                    ", reqId=" + reqId + ", errMsg=" + retryErr.getMessage() + ']');
+                final String retryCause = String.format(
+                    "Failed to execute non-collocated query (will retry) [localNodeId=%s, rmtNodeId=%s, reqId=%s, " +
+                    "errMsg=%s]", ctx.localNodeId(), node.id(), reqId, retryErr.getMessage()
+                );
 
-                sendRetry(node, reqId, segmentId);
+                sendRetry(node, reqId, segmentId, retryCause);
             }
             else {
                 U.error(log, "Failed to execute local query.", e);
@@ -1035,13 +1045,15 @@ public class GridMapQueryExecutor {
 
         List<GridReservable> reserved = new ArrayList<>();
 
-        if (!reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId)) {
+        String err = reservePartitions(cacheIds, topVer, parts, reserved, node.id(), reqId);
+
+        if (!F.isEmpty(err)) {
             U.error(log, "Failed to reserve partitions for DML request. [localNodeId=" + ctx.localNodeId() +
                 ", nodeId=" + node.id() + ", reqId=" + req.requestId() + ", cacheIds=" + cacheIds +
                 ", topVer=" + topVer + ", parts=" + Arrays.toString(parts) + ']');
 
-            sendUpdateResponse(node, reqId, null, "Failed to reserve partitions for DML request. " +
-                "Explanation (Retry your request when re-balancing is over).");
+            sendUpdateResponse(node, reqId, null,
+                "Failed to reserve partitions for DML request. " + err);
 
             return;
         }
@@ -1299,7 +1311,7 @@ public class GridMapQueryExecutor {
      * @param reqId Request ID.
      * @param segmentId Index segment ID.
      */
-    private void sendRetry(ClusterNode node, long reqId, int segmentId) {
+    private void sendRetry(ClusterNode node, long reqId, int segmentId, String retryCause) {
         try {
             boolean loc = node.isLocal();
 
@@ -1310,6 +1322,7 @@ public class GridMapQueryExecutor {
                 false);
 
             msg.retry(h2.readyTopologyVersion());
+            msg.retryCause(retryCause);
 
             if (loc)
                 h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);

http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 910ad1a..586fb51 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -65,7 +65,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLo
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSelectForUpdateFuture;
 import org.apache.ignite.internal.processors.cache.distributed.near.TxTopologyVersionFuture;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker;
-import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryMarshallable;
 import org.apache.ignite.internal.processors.cache.query.GridCacheQueryType;
 import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery;
@@ -234,8 +233,7 @@ public class GridReduceQueryExecutor {
      * @param nodeId Left node ID.
      */
     private void handleNodeLeft(ReduceQueryRun r, UUID nodeId) {
-        // Will attempt to retry. If reduce query was started it will fail on next page fetching.
-        retry(r, h2.readyTopologyVersion(), nodeId);
+        r.setStateOnNodeLeave(nodeId, h2.readyTopologyVersion());
     }
 
     /**
@@ -287,12 +285,13 @@ public class GridReduceQueryExecutor {
      */
     private void fail(ReduceQueryRun r, UUID nodeId, String msg, byte failCode) {
         if (r != null) {
-            CacheException e = new CacheException("Failed to execute map query on the node: " + nodeId + ", " + msg);
+            CacheException e = new CacheException("Failed to execute map query on remote node [nodeId=" + nodeId +
+                ", errMsg=" + msg + ']');
 
             if (failCode == GridQueryFailResponse.CANCELLED_BY_ORIGINATOR)
                 e.addSuppressed(new QueryCancelledException());
 
-            r.state(e, nodeId);
+            r.setStateOnException(nodeId, e);
         }
     }
 
@@ -300,7 +299,7 @@ public class GridReduceQueryExecutor {
      * @param node Node.
      * @param msg Message.
      */
-    private void onNextPage(final ClusterNode node, GridQueryNextPageResponse msg) {
+    private void onNextPage(final ClusterNode node, final GridQueryNextPageResponse msg) {
         final long qryReqId = msg.queryRequestId();
         final int qry = msg.query();
         final int seg = msg.segmentId();
@@ -319,20 +318,13 @@ public class GridReduceQueryExecutor {
         try {
             page = new GridResultPage(ctx, node.id(), msg) {
                 @Override public void fetchNextPage() {
-                    Object errState = r.state();
+                    if (r.hasErrorOrRetry()) {
+                        if (r.exception() != null)
+                            throw r.exception();
 
-                    if (errState != null) {
-                        CacheException err0 = errState instanceof CacheException ? (CacheException)errState : null;
+                        assert r.retryCause() != null;
 
-                        if (err0 != null && err0.getCause() instanceof IgniteClientDisconnectedException)
-                            throw err0;
-
-                        CacheException e = new CacheException("Failed to fetch data from node: " + node.id());
-
-                        if (err0 != null)
-                            e.addSuppressed(err0);
-
-                        throw e;
+                        throw new CacheException(r.retryCause());
                     }
 
                     try {
@@ -360,7 +352,7 @@ public class GridReduceQueryExecutor {
         idx.addPage(page);
 
         if (msg.retry() != null)
-            retry(r, msg.retry(), node.id());
+            r.setStateOnRetry(node.id(), msg.retry(), msg.retryCause());
         else if (msg.page() == 0) {
             // Do count down on each first page received.
             r.latch().countDown();
@@ -373,20 +365,14 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param r Query run.
-     * @param retryVer Retry version.
-     * @param nodeId Node ID.
-     */
-    private void retry(ReduceQueryRun r, AffinityTopologyVersion retryVer, UUID nodeId) {
-        r.state(retryVer, nodeId);
-    }
-
-    /**
      * @param cacheIds Cache IDs.
      * @return {@code true} If preloading is active.
      */
     private boolean isPreloadingActive(List<Integer> cacheIds) {
         for (Integer cacheId : cacheIds) {
+            if (null == cacheContext(cacheId))
+                throw new CacheException(String.format("Cache not found on local node [cacheId=%d]", cacheId));
+
             if (hasMovingPartitions(cacheContext(cacheId)))
                 return true;
         }
@@ -399,6 +385,8 @@ public class GridReduceQueryExecutor {
      * @return {@code True} If cache has partitions in {@link GridDhtPartitionState#MOVING} state.
      */
     private boolean hasMovingPartitions(GridCacheContext<?, ?> cctx) {
+        assert cctx != null;
+
         return !cctx.isLocal() && cctx.topology().hasMovingPartitions();
     }
 
@@ -593,9 +581,18 @@ public class GridReduceQueryExecutor {
 
         final long startTime = U.currentTimeMillis();
 
+        ReduceQueryRun lastRun = null;
+
         for (int attempt = 0;; attempt++) {
-            if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout))
-                throw new CacheException("Failed to map SQL query to topology.");
+            if (attempt > 0 && retryTimeout > 0 && (U.currentTimeMillis() - startTime > retryTimeout)) {
+                UUID retryNodeId = lastRun.retryNodeId();
+                String retryCause = lastRun.retryCause();
+
+                assert !F.isEmpty(retryCause);
+
+                throw new CacheException("Failed to map SQL query to topology on data node [dataNodeId=" + retryNodeId +
+                    ", msg=" + retryCause + ']');
+            }
 
             if (attempt != 0) {
                 try {
@@ -877,29 +874,26 @@ public class GridReduceQueryExecutor {
                 if (send(nodes, req, spec, false)) {
                     awaitAllReplies(r, nodes, cancel);
 
-                    Object state = r.state();
-
-                    if (state != null) {
-                        if (state instanceof CacheException) {
-                            CacheException err = (CacheException)state;
+                    if (r.hasErrorOrRetry()) {
+                        CacheException err = r.exception();
 
+                        if (err != null) {
                             if (err.getCause() instanceof IgniteClientDisconnectedException)
                                 throw err;
 
                             if (wasCancelled(err))
                                 throw new QueryCancelledException(); // Throw correct exception.
 
-                            throw new CacheException("Failed to run map query remotely." + err.getMessage(), err);
+                            throw new CacheException("Failed to run map query remotely: " + err.getMessage(), err);
                         }
-
-                        if (state instanceof AffinityTopologyVersion) {
+                        else {
                             retry = true;
 
                             // On-the-fly topology change must not be possible in FOR UPDATE case.
                             assert sfuFut == null;
 
                             // If remote node asks us to retry then we have outdated full partition map.
-                            h2.awaitForReadyTopologyVersion((AffinityTopologyVersion)state);
+                            h2.awaitForReadyTopologyVersion(r.retryTopologyVersion());
                         }
                     }
                 }
@@ -952,6 +946,9 @@ public class GridReduceQueryExecutor {
                     }
                 }
                 else {
+                    assert r != null;
+                    lastRun=r;
+
                     if (Thread.currentThread().isInterrupted())
                         throw new IgniteInterruptedCheckedException("Query was interrupted.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
index 3c17640..0cb986b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridResultPage.java
@@ -72,8 +72,6 @@ public class GridResultPage {
             Collection<?> plainRows = res.plainRows();
 
             if (plainRows != null) {
-                assert plainRows instanceof ArrayList;
-
                 rowsInPage = plainRows.size();
 
                 if (rowsInPage == 0 || ((ArrayList<Value[]>)plainRows).get(0).length == res.columns())

http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
index df72e8c..7ddd653 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/ReduceQueryRun.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxSe
 import org.apache.ignite.internal.processors.cache.query.GridCacheTwoStepQuery;
 import org.apache.ignite.internal.processors.query.GridQueryCancel;
 import org.apache.ignite.internal.processors.query.GridRunningQueryInfo;
+import org.apache.ignite.internal.util.typedef.F;
 import org.h2.jdbc.JdbcConnection;
 import org.jetbrains.annotations.Nullable;
 
@@ -53,8 +54,8 @@ class ReduceQueryRun {
     /** */
     private final int pageSize;
 
-    /** Can be either CacheException in case of error or AffinityTopologyVersion to retry if needed. */
-    private final AtomicReference<Object> state = new AtomicReference<>();
+    /** */
+    private final AtomicReference<State> state = new AtomicReference<>();
 
     /** Future controlling {@code SELECT FOR UPDATE} query execution. */
     private final GridNearTxSelectForUpdateFuture selectForUpdateFut;
@@ -86,30 +87,59 @@ class ReduceQueryRun {
     }
 
     /**
-     * @param o Fail state object.
+     * Set state on exception.
+     *
+     * @param err error.
+     * @param nodeId Node ID.
+     */
+    void setStateOnException(@Nullable UUID nodeId, CacheException err) {
+        setState0(new State(nodeId, err, null, null));
+    }
+
+    /**
+     * Set state on map node leave.
+     *
+     * @param nodeId Node ID.
+     * @param topVer Topology version.
+     */
+    void setStateOnNodeLeave(UUID nodeId, AffinityTopologyVersion topVer) {
+        setState0(new State(nodeId, null, topVer, "Data node has left the grid during query execution [nodeId=" +
+            nodeId + ']'));
+    }
+
+    /**
+     * Set state on retry due to mapping failure.
+     *
      * @param nodeId Node ID.
+     * @param topVer Topology version.
+     * @param retryCause Retry cause.
      */
-    void state(Object o, @Nullable UUID nodeId) {
-        assert o != null;
-        assert o instanceof CacheException || o instanceof AffinityTopologyVersion : o.getClass();
+    void setStateOnRetry(UUID nodeId, AffinityTopologyVersion topVer, String retryCause) {
+        assert !F.isEmpty(retryCause);
 
-        if (!state.compareAndSet(null, o))
+        setState0(new State(nodeId, null, topVer, retryCause));
+    }
+
+    /**
+     *
+     * @param state state
+     */
+    private void setState0(State state){
+        if (!this.state.compareAndSet(null, state))
             return;
 
         while (latch.getCount() != 0) // We don't need to wait for all nodes to reply.
             latch.countDown();
 
-        CacheException e = o instanceof CacheException ? (CacheException) o : null;
-
         for (GridMergeIndex idx : idxs) // Fail all merge indexes.
-            idx.fail(nodeId, e);
+            idx.fail(state.nodeId, state.ex);
     }
 
     /**
      * @param e Error.
      */
     void disconnected(CacheException e) {
-        state(e, null);
+        setStateOnException(null, e);
     }
 
     /**
@@ -133,11 +163,45 @@ class ReduceQueryRun {
         return conn;
     }
 
+    /** */
+    boolean hasErrorOrRetry(){
+        return state.get() != null;
+    }
+
     /**
-     * @return State.
+     * @return Exception.
      */
-    Object state() {
-        return state.get();
+    CacheException exception() {
+        State st = state.get();
+
+        return st != null ? st.ex : null;
+    }
+
+    /**
+     * @return Retry topology version.
+     */
+    AffinityTopologyVersion retryTopologyVersion(){
+        State st = state.get();
+
+        return st != null ? st.retryTopVer : null;
+    }
+
+    /**
+     * @return Retry bode ID.
+     */
+    UUID retryNodeId() {
+        State st = state.get();
+
+        return st != null ? st.nodeId : null;
+    }
+
+    /**
+     * @return Retry cause.
+     */
+    String retryCause(){
+        State st = state.get();
+
+        return st != null ? st.retryCause : null;
     }
 
     /**
@@ -167,4 +231,29 @@ class ReduceQueryRun {
     @Nullable public GridNearTxSelectForUpdateFuture selectForUpdateFuture() {
         return selectForUpdateFut;
     }
+
+    /**
+     * Error state.
+     */
+    private static class State {
+        /** Affected node (may be null in case of local node failure). */
+        private final UUID nodeId;
+
+        /** Error. */
+        private final CacheException ex;
+
+        /** Retry topology version. */
+        private final AffinityTopologyVersion retryTopVer;
+
+        /** Retry cause. */
+        private final String retryCause;
+
+        /** */
+        private State(UUID nodeId, CacheException ex, AffinityTopologyVersion retryTopVer, String retryCause){
+            this.nodeId = nodeId;
+            this.ex = ex;
+            this.retryTopVer = retryTopVer;
+            this.retryCause = retryCause;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java
new file mode 100644
index 0000000..8c4358a
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheCauseRetryMessageSelfTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.messages.GridQueryCancelRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
+import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Organization;
+import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Person;
+
+/**
+ * Failed to reserve partitions for query (cache is not found on local node) Root cause test
+ */
+public class DisappearedCacheCauseRetryMessageSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int NODES_COUNT = 2;
+    /** */
+    private static final String ORG = "org";
+    /** */
+    private IgniteCache<String, JoinSqlTestHelper.Person> personCache;
+    /** */
+    private IgniteCache<String, JoinSqlTestHelper.Organization> orgCache;
+
+    /** */
+    public void testDisappearedCacheCauseRetryMessage() {
+
+        SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0");
+
+        qry.setDistributedJoins(true);
+
+        try {
+            personCache.query(qry).getAll();
+
+            fail("No CacheException emitted.");
+        }
+        catch (CacheException e) {
+            assertTrue(e.getMessage(), e.getMessage().contains("Failed to reserve partitions for query (cache is not found on local node) ["));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new TcpCommunicationSpi(){
+
+            volatile long reqId = -1;
+            /** {@inheritDoc} */
+            @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+                assert msg != null;
+
+                if ( GridIoMessage.class.isAssignableFrom(msg.getClass())){
+                    GridIoMessage gridMsg = (GridIoMessage)msg;
+
+                    if ( GridH2QueryRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){
+                        GridH2QueryRequest req = (GridH2QueryRequest) (gridMsg.message());
+                        reqId = req.requestId();
+                        orgCache.destroy();
+                    }
+                    else if ( GridQueryCancelRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){
+                        GridQueryCancelRequest req = (GridQueryCancelRequest) (gridMsg.message());
+
+                        if (reqId == req.queryRequestId())
+                            orgCache = DisappearedCacheCauseRetryMessageSelfTest.this.ignite(0).getOrCreateCache(new CacheConfiguration<String, Organization>(ORG)
+                                .setCacheMode(CacheMode.REPLICATED)
+                                .setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class)
+                            );
+
+                    }
+                }
+
+                super.sendMessage(node, msg, ackC);
+            }
+        });
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000");
+
+        startGridsMultiThreaded(NODES_COUNT, false);
+
+        personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Person>("pers")
+            .setIndexedTypes(String.class, JoinSqlTestHelper.Person.class)
+        );
+
+        orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Organization>(ORG)
+            .setCacheMode(CacheMode.REPLICATED)
+            .setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class)
+        );
+
+        awaitPartitionMapExchange();
+
+        JoinSqlTestHelper.populateDataIntoOrg(orgCache);
+
+        JoinSqlTestHelper.populateDataIntoPerson(personCache);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheWasNotFoundMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheWasNotFoundMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheWasNotFoundMessageSelfTest.java
new file mode 100644
index 0000000..9928ed6
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DisappearedCacheWasNotFoundMessageSelfTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
+import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Organization;
+import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Person;
+
+/**
+ * Grid cache context is not registered for cache id root cause message test
+ */
+public class DisappearedCacheWasNotFoundMessageSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int NODES_COUNT = 2;
+    /** */
+    private static final String ORG = "org";
+    /** */
+    private IgniteCache<String, JoinSqlTestHelper.Person> personCache;
+    /** */
+    private IgniteCache<String, JoinSqlTestHelper.Organization> orgCache;
+
+    /** */
+    public void testDisappearedCacheWasNotFoundMessage() {
+        SqlQuery<String, Person> qry = new SqlQuery<String, Person>(Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0");
+
+        qry.setDistributedJoins(true);
+
+        try {
+            personCache.query(qry).getAll();
+
+            fail("No CacheException emitted.");
+        }
+        catch (CacheException e) {
+            assertTrue(e.getMessage(), e.getMessage().contains("Cache not found on local node"));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new TcpCommunicationSpi(){
+            /** {@inheritDoc} */
+            @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+                assert msg != null;
+
+                if ( GridIoMessage.class.isAssignableFrom(msg.getClass())){
+                    GridIoMessage gridMsg = (GridIoMessage)msg;
+
+                    if ( GridH2QueryRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){
+                        GridH2QueryRequest req = (GridH2QueryRequest) (gridMsg.message());
+
+                        req.requestId();
+
+                        orgCache.destroy();
+                    }
+                }
+
+                super.sendMessage(node, msg, ackC);
+            }
+        });
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000");
+
+        startGridsMultiThreaded(NODES_COUNT, false);
+
+        personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Person>("pers")
+            .setIndexedTypes(String.class, JoinSqlTestHelper.Person.class)
+        );
+
+        orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Organization>(ORG)
+                .setCacheMode(CacheMode.REPLICATED)
+                .setIndexedTypes(String.class, Organization.class)
+        );
+
+        awaitPartitionMapExchange();
+
+        JoinSqlTestHelper.populateDataIntoOrg(orgCache);
+
+        JoinSqlTestHelper.populateDataIntoPerson(personCache);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java
new file mode 100644
index 0000000..fe7821a
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinSqlTestHelper.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.annotations.QuerySqlField;
+
+/**
+ * Join sql test helper
+ */
+public class JoinSqlTestHelper {
+    /** */
+    private static final int ORG_COUNT = 100;
+
+    /** */
+    private static final int PERSON_PER_ORG_COUNT = 10;
+
+    /** */
+    static final String JOIN_SQL = "select * from Person, \"org\".Organization as org " +
+        "where Person.orgId = org.id " +
+        "and lower(org.name) = lower(?)";
+
+    /**
+     * Populate organization cache with test data
+     * @param cache @{IgniteCache}
+     */
+    static void populateDataIntoOrg(IgniteCache<String, Organization> cache) {
+        for (int i = 0; i < ORG_COUNT; i++) {
+            Organization org = new Organization();
+
+            org.setId("org" + i);
+
+            org.setName("Organization #" + i);
+
+            cache.put(org.getId(), org);
+        }
+    }
+
+    /**
+     * Populate person cache with test data
+     * @param cache @{IgniteCache}
+     */
+    static void populateDataIntoPerson(IgniteCache<String, Person> cache) {
+        int personId = 0;
+
+        for (int i = 0; i < ORG_COUNT; i++) {
+            Organization org = new Organization();
+
+            org.setId("org" + i);
+
+            org.setName("Organization #" + i);
+
+            for (int j = 0; j < PERSON_PER_ORG_COUNT; j++) {
+                Person prsn = new Person();
+
+                prsn.setId("pers" + personId);
+
+                prsn.setOrgId(org.getId());
+
+                prsn.setName("Person name #" + personId);
+
+                cache.put(prsn.getId(), prsn);
+
+                personId++;
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    public static class Person {
+        /** */
+        @QuerySqlField(index = true)
+        private String id;
+
+        /** */
+        @QuerySqlField(index = true)
+        private String orgId;
+
+        /** */
+        @QuerySqlField(index = true)
+        private String name;
+
+        /** */
+        public String getId() {
+            return id;
+        }
+
+        /** */
+        public void setId(String id) {
+            this.id = id;
+        }
+
+        /** */
+        public String getOrgId() {
+            return orgId;
+        }
+
+        /** */
+        public void setOrgId(String orgId) {
+            this.orgId = orgId;
+        }
+
+        /** */
+        public String getName() {
+            return name;
+        }
+
+        /** */
+        public void setName(String name) {
+            this.name = name;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class Organization {
+        /** */
+        @QuerySqlField(index = true)
+        private String id;
+
+        /** */
+        @QuerySqlField(index = true)
+        private String name;
+
+        /** */
+        public void setId(String id) {
+            this.id = id;
+        }
+
+        /** */
+        public String getId() {
+            return id;
+        }
+
+        /** */
+        public String getName() {
+            return name;
+        }
+
+        /** */
+        public void setName(String name) {
+            this.name = name;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
new file mode 100644
index 0000000..c602225
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/NonCollocatedRetryMessageSelfTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.List;
+import javax.cache.Cache;
+import javax.cache.CacheException;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
+
+/**
+ * Failed to execute non-collocated query root cause message test
+ */
+public class NonCollocatedRetryMessageSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int NODES_COUNT = 3;
+
+    /** */
+    private static final String ORG = "org";
+
+    /** */
+    private IgniteCache<String, JoinSqlTestHelper.Person> personCache;
+
+    /** */
+    public void testNonCollocatedRetryMessage() {
+        SqlQuery<String, JoinSqlTestHelper.Person> qry = new SqlQuery<String, JoinSqlTestHelper.Person>(JoinSqlTestHelper.Person.class, JoinSqlTestHelper.JOIN_SQL).setArgs("Organization #0");
+
+        qry.setDistributedJoins(true);
+
+        try {
+            List<Cache.Entry<String,JoinSqlTestHelper.Person>> prsns = personCache.query(qry).getAll();
+            fail("No CacheException emitted. Collection size="+prsns.size());
+        }
+        catch (CacheException e) {
+            assertTrue(e.getMessage(), e.getMessage().contains("Failed to execute non-collocated query"));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new TcpCommunicationSpi(){
+            volatile long reqId = -1;
+            /** {@inheritDoc} */
+            @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+                assert msg != null;
+
+                if ( GridIoMessage.class.isAssignableFrom(msg.getClass())){
+                    GridIoMessage gridMsg = (GridIoMessage)msg;
+
+                    if ( GridH2QueryRequest.class.isAssignableFrom( gridMsg.message().getClass() ) ){
+                        GridH2QueryRequest req = (GridH2QueryRequest) (gridMsg.message());
+
+                        if (reqId < 0) {
+                            reqId = req.requestId();
+
+                            String shutName = getTestIgniteInstanceName(1);
+
+                            stopGrid(shutName, true, false);
+                        }
+                        else if( reqId != req.requestId() ){
+                            try {
+                                U.sleep(IgniteSystemProperties.getLong(IGNITE_SQL_RETRY_TIMEOUT, GridReduceQueryExecutor.DFLT_RETRY_TIMEOUT));
+                            }
+                            catch (IgniteInterruptedCheckedException e) {
+                                // no-op
+                            }
+                        }
+                    }
+                }
+                super.sendMessage(node, msg, ackC);
+            }
+        });
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi(){
+            public long getNodesJoined() {
+                return stats.joinedNodesCount();
+            }
+        });
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000");
+
+        startGridsMultiThreaded(NODES_COUNT, false);
+
+        personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, JoinSqlTestHelper.Person>("pers")
+            .setBackups(1)
+            .setIndexedTypes(String.class, JoinSqlTestHelper.Person.class)
+        );
+
+        final IgniteCache<String, JoinSqlTestHelper.Organization> orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, JoinSqlTestHelper.Organization>(ORG)
+            .setBackups(1)
+            .setIndexedTypes(String.class, JoinSqlTestHelper.Organization.class)
+        );
+
+        awaitPartitionMapExchange();
+
+        JoinSqlTestHelper.populateDataIntoOrg(orgCache);
+
+        JoinSqlTestHelper.populateDataIntoPerson(personCache);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
new file mode 100644
index 0000000..3269887
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/RetryCauseMessageSelfTest.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.UUID;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.cache.CacheException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
+import org.apache.ignite.internal.processors.query.GridQueryProcessor;
+import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
+import org.apache.ignite.internal.processors.query.h2.opt.GridH2RetryException;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_SQL_RETRY_TIMEOUT;
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.NONE;
+import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.JOIN_SQL;
+import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Organization;
+import static org.apache.ignite.internal.processors.query.h2.twostep.JoinSqlTestHelper.Person;
+
+/**
+ * Test for 6 retry cases
+ */
+public class RetryCauseMessageSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int NODES_COUNT = 2;
+
+    /** */
+    private static final String ORG_SQL = "select * from Organization";
+
+    /** */
+    private static final String ORG = "org";
+
+    /** */
+    private IgniteCache<String, Person> personCache;
+
+    /** */
+    private IgniteCache<String, Organization> orgCache;
+
+    /** */
+    private IgniteH2Indexing h2Idx;
+
+    /** */
+    @Override protected long getTestTimeout() {
+        return 600 * 1000;
+    }
+
+    /**
+     * Failed to reserve partitions for query (cache is not found on local node)
+     */
+    public void testSynthCacheWasNotFoundMessage() {
+        GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec");
+
+        GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec",
+            new MockGridMapQueryExecutor(null) {
+                @Override public void onMessage(UUID nodeId, Object msg) {
+                    if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) {
+                        GridH2QueryRequest qryReq = (GridH2QueryRequest)msg;
+
+                        qryReq.caches().add(Integer.MAX_VALUE);
+
+                        startedExecutor.onMessage(nodeId, msg);
+
+                        qryReq.caches().remove(qryReq.caches().size() - 1);
+                    }
+                    else
+                        startedExecutor.onMessage(nodeId, msg);
+                }
+            }.insertRealExecutor(mapQryExec));
+
+        SqlQuery<String, Person> qry = new SqlQuery<String, Person>(Person.class, JOIN_SQL).setArgs("Organization #0");
+
+        qry.setDistributedJoins(true);
+
+        try {
+            personCache.query(qry).getAll();
+        }
+        catch (CacheException e) {
+            assertTrue(e.getMessage(), e.getMessage().contains("Failed to reserve partitions for query (cache is not found on local node) ["));
+
+            return;
+        }
+        finally {
+            GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec);
+        }
+        fail();
+    }
+
+    /**
+     * Failed to reserve partitions for query (group reservation failed)
+     */
+    public void testGrpReservationFailureMessage() {
+        final GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec");
+
+        final ConcurrentMap<MapReservationKey, GridReservable> reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations");
+
+        GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec",
+            new MockGridMapQueryExecutor(null) {
+                @Override public void onMessage(UUID nodeId, Object msg) {
+                    if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) {
+                        final MapReservationKey grpKey = new MapReservationKey(ORG, null);
+
+                        reservations.put(grpKey, new GridReservable() {
+
+                            @Override public boolean reserve() {
+                                return false;
+                            }
+
+                            @Override public void release() {}
+                        });
+                    }
+                    startedExecutor.onMessage(nodeId, msg);
+                }
+            }.insertRealExecutor(mapQryExec));
+
+        SqlQuery<String, Person> qry = new SqlQuery<String, Person>(Person.class, JOIN_SQL).setArgs("Organization #0");
+
+        qry.setDistributedJoins(true);
+
+        try {
+            personCache.query(qry).getAll();
+        }
+        catch (CacheException e) {
+            assertTrue(e.getMessage().contains("Failed to reserve partitions for query (group reservation failed) ["));
+
+            return;
+        }
+        finally {
+            GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec);
+        }
+        fail();
+    }
+
+    /**
+     * Failed to reserve partitions for query (partition of REPLICATED cache is not in OWNING state)
+     */
+    public void testReplicatedCacheReserveFailureMessage() {
+        GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec");
+
+        final GridKernalContext ctx = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "ctx");
+
+        GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec",
+            new MockGridMapQueryExecutor(null) {
+                @Override public void onMessage(UUID nodeId, Object msg) {
+                    if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) {
+                        GridH2QueryRequest qryReq = (GridH2QueryRequest)msg;
+
+                        GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(qryReq.caches().get(0));
+
+                        GridDhtLocalPartition part = cctx.topology().localPartition(0, NONE, false);
+
+                        AtomicLong aState = GridTestUtils.getFieldValue(part, GridDhtLocalPartition.class, "state");
+
+                        long stateVal = aState.getAndSet(2);
+
+                        startedExecutor.onMessage(nodeId, msg);
+
+                        aState.getAndSet(stateVal);
+                    }
+                    else 
+                        startedExecutor.onMessage(nodeId, msg);
+                }
+            }.insertRealExecutor(mapQryExec));
+
+        SqlQuery<String, Organization> qry = new SqlQuery<>(Organization.class, ORG_SQL);
+
+        qry.setDistributedJoins(true);
+        try {
+            orgCache.query(qry).getAll();
+        }
+        catch (CacheException e) {
+            assertTrue(e.getMessage().contains("Failed to reserve partitions for query (partition of REPLICATED cache is not in OWNING state) ["));
+
+            return;
+        }
+        finally {
+            GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec);
+        }
+        fail();
+    }
+
+    /**
+     * Failed to reserve partitions for query (partition of PARTITIONED cache cannot be reserved)
+     */
+    public void testPartitionedCacheReserveFailureMessage() {
+        GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec");
+
+        final GridKernalContext ctx = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "ctx");
+
+        GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec",
+            new MockGridMapQueryExecutor(null) {
+                @Override public void onMessage(UUID nodeId, Object msg) {
+                    if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) {
+                        GridH2QueryRequest qryReq = (GridH2QueryRequest)msg;
+
+                        GridCacheContext<?, ?> cctx = ctx.cache().context().cacheContext(qryReq.caches().get(0));
+
+                        GridDhtLocalPartition part = cctx.topology().localPartition(0, NONE, false);
+
+                        AtomicLong aState = GridTestUtils.getFieldValue(part, GridDhtLocalPartition.class, "state");
+
+                        long stateVal = aState.getAndSet(2);
+
+                        startedExecutor.onMessage(nodeId, msg);
+
+                        aState.getAndSet(stateVal);
+                    }
+                    else
+                        startedExecutor.onMessage(nodeId, msg);
+
+                }
+            }.insertRealExecutor(mapQryExec));
+
+        SqlQuery<String, Person> qry = new SqlQuery<String, Person>(Person.class, JOIN_SQL).setArgs("Organization #0");
+
+        qry.setDistributedJoins(true);
+        try {
+            personCache.query(qry).getAll();
+        }
+        catch (CacheException e) {
+            assertTrue(e.getMessage().contains("Failed to reserve partitions for query (partition of PARTITIONED cache cannot be reserved) ["));
+
+            return;
+        }
+        finally {
+            GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec);
+        }
+        fail();
+    }
+
+    /**
+     * Failed to execute non-collocated query (will retry)
+     */
+    public void testNonCollocatedFailureMessage() {
+        final GridMapQueryExecutor mapQryExec = GridTestUtils.getFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec");
+
+        final ConcurrentMap<MapReservationKey, GridReservable> reservations = GridTestUtils.getFieldValue(mapQryExec, GridMapQueryExecutor.class, "reservations");
+
+        GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec",
+            new MockGridMapQueryExecutor(null) {
+                @Override public void onMessage(UUID nodeId, Object msg) {
+                    if (GridH2QueryRequest.class.isAssignableFrom(msg.getClass())) {
+                        final MapReservationKey grpKey = new MapReservationKey(ORG, null);
+
+                        reservations.put(grpKey, new GridReservable() {
+
+                            @Override public boolean reserve() {
+                                throw new GridH2RetryException("test retry exception");
+                            }
+
+                            @Override public void release() {
+                            }
+                        });
+                    }
+                    startedExecutor.onMessage(nodeId, msg);
+
+                }
+            }.insertRealExecutor(mapQryExec));
+
+        SqlQuery<String, Person> qry = new SqlQuery<String, Person>(Person.class, JOIN_SQL).setArgs("Organization #0");
+
+        qry.setDistributedJoins(true);
+        try {
+            personCache.query(qry).getAll();
+        }
+        catch (CacheException e) {
+            assertTrue(e.getMessage().contains("Failed to execute non-collocated query (will retry) ["));
+
+            return;
+        }
+        finally {
+            GridTestUtils.setFieldValue(h2Idx, IgniteH2Indexing.class, "mapQryExec", mapQryExec);
+        }
+        fail();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCommunicationSpi(new TcpCommunicationSpi(){
+            /** {@inheritDoc} */
+            @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+                assert msg != null;
+
+                super.sendMessage(node, msg, ackC);
+            }
+        });
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        System.setProperty(IGNITE_SQL_RETRY_TIMEOUT, "5000");
+
+        Ignite ignite = startGridsMultiThreaded(NODES_COUNT, false);
+
+        GridQueryProcessor qryProc = grid(ignite.name()).context().query();
+
+        h2Idx = GridTestUtils.getFieldValue(qryProc, GridQueryProcessor.class, "idx");
+
+        personCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Person>("pers")
+            .setIndexedTypes(String.class, Person.class)
+        );
+
+        orgCache = ignite(0).getOrCreateCache(new CacheConfiguration<String, Organization>(ORG)
+            .setCacheMode(CacheMode.REPLICATED)
+            .setIndexedTypes(String.class, Organization.class)
+        );
+
+        awaitPartitionMapExchange();
+
+        JoinSqlTestHelper.populateDataIntoOrg(orgCache);
+
+        JoinSqlTestHelper.populateDataIntoPerson(personCache);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+
+    /**
+     * Wrapper around @{GridMapQueryExecutor}
+     */
+    private abstract static class MockGridMapQueryExecutor extends GridMapQueryExecutor {
+
+        /**
+         * Wrapped executor
+         */
+        GridMapQueryExecutor startedExecutor;
+
+        /** */
+        MockGridMapQueryExecutor insertRealExecutor(GridMapQueryExecutor realExecutor) {
+            this.startedExecutor = realExecutor;
+            return this;
+        }
+
+        /**
+         * @param busyLock Busy lock.
+         */
+        MockGridMapQueryExecutor(GridSpinBusyLock busyLock) {
+            super(busyLock);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMessage(UUID nodeId, Object msg) {
+            startedExecutor.onMessage(nodeId, msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void cancelLazyWorkers() {
+            startedExecutor.cancelLazyWorkers();
+        }
+
+        /** {@inheritDoc} */
+        @Override GridSpinBusyLock busyLock() {
+            return startedExecutor.busyLock();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCacheStop(String cacheName) {
+            startedExecutor.onCacheStop(cacheName);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void stopAndUnregisterCurrentLazyWorker() {
+            startedExecutor.stopAndUnregisterCurrentLazyWorker();
+        }
+
+        /** {@inheritDoc} */
+        @Override public void unregisterLazyWorker(MapQueryLazyWorker worker) {
+            startedExecutor.unregisterLazyWorker(worker);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int registeredLazyWorkers() {
+            return startedExecutor.registeredLazyWorkers();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/525a7754/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
index 093423d..536834c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite2.java
@@ -51,6 +51,10 @@ import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlDistribut
 import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexMultiNodeSelfTest;
 import org.apache.ignite.internal.processors.query.IgniteCacheGroupsSqlSegmentedIndexSelfTest;
 import org.apache.ignite.internal.processors.query.h2.twostep.CacheQueryMemoryLeakTest;
+import org.apache.ignite.internal.processors.query.h2.twostep.DisappearedCacheCauseRetryMessageSelfTest;
+import org.apache.ignite.internal.processors.query.h2.twostep.DisappearedCacheWasNotFoundMessageSelfTest;
+import org.apache.ignite.internal.processors.query.h2.twostep.NonCollocatedRetryMessageSelfTest;
+import org.apache.ignite.internal.processors.query.h2.twostep.RetryCauseMessageSelfTest;
 import org.apache.ignite.testframework.IgniteTestSuite;
 
 /**
@@ -110,6 +114,11 @@ public class IgniteCacheQuerySelfTestSuite2 extends TestSuite {
 
         suite.addTestSuite(CacheQueryMemoryLeakTest.class);
 
+        suite.addTestSuite(NonCollocatedRetryMessageSelfTest.class);
+        suite.addTestSuite(RetryCauseMessageSelfTest.class);
+        suite.addTestSuite(DisappearedCacheCauseRetryMessageSelfTest.class);
+        suite.addTestSuite(DisappearedCacheWasNotFoundMessageSelfTest.class);
+
         return suite;
     }
 }


Mime
View raw message