ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: # ignite-901 WIP
Date Mon, 06 Jul 2015 14:39:57 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-901 1a2ed51a4 -> 29c7fa734


# ignite-901 WIP


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/29c7fa73
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/29c7fa73
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/29c7fa73

Branch: refs/heads/ignite-901
Commit: 29c7fa7349e01763f657978a322753ffd8f96984
Parents: 1a2ed51
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jul 6 17:22:30 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jul 6 17:35:34 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalContextImpl.java  |  12 +-
 .../apache/ignite/internal/IgniteKernal.java    |   6 +-
 .../datastreamer/DataStreamProcessor.java       |   7 +
 .../datastreamer/DataStreamerImpl.java          |   7 +
 .../processors/service/GridServiceProxy.java    |   3 +
 ...IgniteClientReconnectDiscoveryStateTest.java |   8 +-
 .../IgniteClientReconnectFailoverSelfTest.java  | 290 -------------------
 .../IgniteClientReconnectFailoverTest.java      | 290 +++++++++++++++++++
 .../IgniteClientReconnectServicesTest.java      |   4 +-
 .../IgniteClientReconnectStreamerTest.java      |   1 +
 .../IgniteClientReconnectTestSuite.java         |   2 +-
 11 files changed, 334 insertions(+), 296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index a4edefb..4a60e28 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -305,6 +305,9 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     /** Marshaller context. */
     private MarshallerContextImpl marshCtx;
 
+    /** */
+    private volatile boolean disconnected;
+
     /**
      * No-arg constructor is required by externalization.
      */
@@ -915,7 +918,14 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** {@inheritDoc} */
     @Override public boolean clientDisconnected() {
-        return locNode.isClient() && gateway().getState() == DISCONNECTED;
+        return locNode.isClient() && disconnected;
+    }
+
+    /**
+     * @param disconnected Disconnected flag.
+     */
+    void disconnected(boolean disconnected) {
+        this.disconnected = disconnected;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 5876288..4af69f7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -2818,6 +2818,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
 
         ctx.cluster().get().clientReconnectFuture(userFut);
 
+        ctx.disconnected(true);
+
         List<GridComponent> comps = ctx.components();
 
         for (ListIterator<GridComponent> it = comps.listIterator(comps.size()); it.hasPrevious();)
{
@@ -2850,10 +2852,12 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable
{
     /**
      * @param clusterRestarted {@code True} if all cluster nodes restarted while client was
disconnected.
      */
-    public void reconnected(boolean clusterRestarted) {
+    public void reconnected(final boolean clusterRestarted) {
         Throwable err = null;
 
         try {
+            ctx.disconnected(false);
+
             for (GridComponent comp : ctx.components())
                 comp.onReconnected(clusterRestarted);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
index 9e53bb5..ee95019 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamProcessor.java
@@ -27,6 +27,7 @@ import org.apache.ignite.internal.util.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.internal.util.worker.*;
+import org.apache.ignite.lang.*;
 import org.apache.ignite.marshaller.*;
 import org.apache.ignite.stream.*;
 import org.apache.ignite.thread.*;
@@ -139,6 +140,12 @@ public class DataStreamProcessor<K, V> extends GridProcessorAdapter
{
             log.debug("Stopped data streamer processor.");
     }
 
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException
{
+        for (DataStreamerImpl<?, ?> ldr : ldrs)
+            ldr.onDisconnected(reconnectFut);
+    }
+
     /**
      * @param cacheName Cache name ({@code null} for default cache).
      * @return Data loader.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 26b0568..b0be06d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -888,6 +888,13 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
     }
 
     /**
+     * @param reconnectFut Reconnect future.
+     */
+    public void onDisconnected(IgniteFuture<?> reconnectFut) {
+
+    }
+
+    /**
      * @return {@code true} If the loader is closed.
      */
     boolean isClosed() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
index 67ddc6f..556beea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProxy.java
@@ -165,6 +165,9 @@ class GridServiceProxy<T> implements Serializable {
                 catch (RuntimeException | Error e) {
                     throw e;
                 }
+                catch (IgniteCheckedException e) {
+                    throw U.convertException(e);
+                }
                 catch (Exception e) {
                     throw new IgniteException(e);
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
index 77927a7..7bd3531 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectDiscoveryStateTest.java
@@ -79,8 +79,14 @@ public class IgniteClientReconnectDiscoveryStateTest extends IgniteClientReconne
 
         client.events().localListen(new IgnitePredicate<Event>() {
             @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED)
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
                     info("Disconnected: " + evt);
+
+                    IgniteFuture<?> fut = client.cluster().clientReconnectFuture();
+
+                    assertNotNull(fut);
+                    assertFalse(fut.isDone());
+                }
                 else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
                     info("Reconnected: " + evt);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java
deleted file mode 100644
index f938733..0000000
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverSelfTest.java
+++ /dev/null
@@ -1,290 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.testframework.*;
-import org.apache.ignite.transactions.*;
-
-import javax.cache.*;
-import javax.cache.processor.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.events.EventType.*;
-
-/**
- *
- */
-public class IgniteClientReconnectFailoverSelfTest extends IgniteClientReconnectAbstractTest
{
-    /** */
-    public final Integer THREADS = 8;
-
-    /** */
-    public final Integer RESTART_CNT = 30;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        cfg.setCacheConfiguration(new CacheConfiguration());
-
-        return cfg;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected int serverCount() {
-        return 1;
-    }
-
-    /** */
-    private volatile CyclicBarrier barrier;
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testCacheOperationReconnectApi() throws Exception {
-        clientMode = true;
-
-        final Ignite client = startGrid(serverCount());
-
-        assertNotNull(client.cache(null));
-
-        Ignite srv = clientRouter(client);
-
-        TestTcpDiscoverySpi srvSpi = spi(srv);
-
-        final AtomicBoolean stop = new AtomicBoolean(false);
-
-        final AtomicLong cntr = new AtomicLong();
-
-        final IgniteQueue<Object> queue = client.queue("test-queue", 1000, new CollectionConfiguration());
-
-        final IgniteAtomicLong atomicLong = client.atomicLong("counter", 0, true);
-
-        final IgniteAtomicSequence sequence = client.atomicSequence("sequence", 0, true);
-
-        final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new
Callable<Object>() {
-            @Override public Object call() throws Exception {
-                IgniteCache<Integer, Integer> cache = client.cache(null);
-
-                IgniteCompute compute = client.compute();
-
-                Set<Integer> keys = new TreeSet<>();
-                final Map<Integer, Integer> entries = new TreeMap<>();
-
-                for (int i = 0; i < 50; i++) {
-                    keys.add(i);
-                    entries.put(i, i);
-                }
-
-                while (!stop.get()) {
-                    cntr.incrementAndGet();
-
-                    try {
-                        // Start cache operations.
-                        for (int i = 0; i < 10; i++) {
-                            cache.put(i, i);
-                            cache.get(i);
-                            cache.remove(i);
-
-                            cache.putAll(entries);
-
-                            cache.invokeAll(keys, new CacheEntryProcessor<Integer, Integer,
Object>() {
-                                @Override public Object process(MutableEntry<Integer,
Integer> entry,
-                                    Object... arguments) throws EntryProcessorException {
-                                    if (ThreadLocalRandom.current().nextBoolean())
-                                        entry.setValue(entry.getValue() * 100);
-                                    else
-                                        entry.remove();
-
-                                    return entry;
-                                }
-                            });
-                        }
-
-                        try (Transaction tx = client.transactions().txStart()) {
-                            for (int i = 0; i < 10; i++) {
-                                cache.put(i, i);
-                                cache.get(i);
-                            }
-
-                            tx.commit();
-                        }
-
-                        // Start async cache operations.
-                        IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
-
-                        for (int i = 0; i < 10; i++) {
-                            asyncCache.put(i, i);
-
-                            asyncCache.future().get();
-
-                            asyncCache.get(i);
-
-                            asyncCache.future().get();
-                        }
-
-                        // Compute.
-//                        for (int i = 0; i < 10; i++) {
-//                            compute.broadcast(new IgniteCallable<Integer>() {
-//                                @IgniteInstanceResource
-//                                private Ignite ignite;
-//
-//                                @Override public Integer call() throws Exception {
-//                                    return ignite.cache(null).localSize();
-//                                }
-//                            });
-//
-//                            compute.broadcast(new IgniteRunnable() {
-//                                @Override public void run() {
-//                                    // No-op.
-//                                }
-//                            });
-//
-//                            compute.apply(new C1<String, String>() {
-//                                @Override public String apply(String o) {
-//                                    return o.toUpperCase();
-//                                }
-//                            }, Arrays.asList("a", "b", "c"));
-//                        }
-
-                        //Data structures.
-//                        for (int i = 0; i < 10; i++) {
-//                            assert atomicLong.incrementAndGet() >= 0;
-//
-//                            queue.offer("Test item");
-//
-//                            if (ThreadLocalRandom.current().nextBoolean())
-//                                for (int j = 0; j < 50; j++)
-//                                    queue.poll();
-//
-//                            assert queue.size() <= 1000;
-//
-//                            assert sequence.addAndGet(i + 1) >= 0;
-//                        }
-                    }
-                    catch (CacheException | IgniteException e) {
-                        log.info("Operation failed, ignore: " + e);
-                    }
-
-                    if (cntr.get() % 100 == 0)
-                        log.info("Iteration: " + cntr);
-
-                    if (barrier != null)
-                        try {
-                            barrier.await();
-                        }
-                        catch (BrokenBarrierException e) {
-                            log.warning("Broken barrier.", e);
-
-                            break;
-                        }
-                }
-
-                return null;
-            }
-        }, THREADS, "test-operation-thread-" + client.name());
-
-        final AtomicBoolean disconnected = new AtomicBoolean(false);
-
-        final AtomicBoolean reconnected = new AtomicBoolean(false);
-
-        client.events().localListen(new IgnitePredicate<Event>() {
-            @Override public boolean apply(Event evt) {
-                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
-                    info("Disconnected: " + evt);
-
-                    if (!reconnected.get())
-                        disconnected.set(true);
-                }
-                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
-                    info("Reconnected: " + evt);
-
-                    if (disconnected.get())
-                        reconnected.set(true);
-                }
-
-                return true;
-            }
-        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
-
-        for (int i = 0; i < RESTART_CNT; i++) {
-            U.sleep(2000);
-
-            log.info("Block reconnect.");
-
-            reconnected.set(false);
-
-            disconnected.set(false);
-
-            log.info("Fail client.");
-
-            srvSpi.failNode(client.cluster().localNode().id(), null);
-
-            GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                @Override public boolean apply() {
-                    return disconnected.get();
-                }
-            }, 5000L);
-
-            barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
-                @Override public void run() {
-                    barrier = null;
-                }
-            });
-
-            GridTestUtils.waitForCondition(new GridAbsPredicate() {
-                @Override public boolean apply() {
-                    return reconnected.get();
-                }
-            }, 5000L);
-
-            try {
-                barrier.await(10, TimeUnit.SECONDS);
-            }
-            catch (TimeoutException e) {
-                log.error("Failed. Operation hangs.");
-
-                for (Ignite ignite : G.allGrids())
-                    dumpCacheDebugInfo(ignite);
-
-                U.dumpThreads(log);
-
-                CyclicBarrier barrier0 = barrier;
-
-                if (barrier0 != null)
-                    barrier0.reset();
-
-                stop.set(true);
-
-                fail("Failed to wait for update.");
-            }
-        }
-
-        stop.set(true);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
new file mode 100644
index 0000000..e51d68a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectFailoverTest.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.transactions.*;
+
+import javax.cache.*;
+import javax.cache.processor.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ *
+ */
+public class IgniteClientReconnectFailoverTest extends IgniteClientReconnectAbstractTest
{
+    /** */
+    public final Integer THREADS = 8;
+
+    /** */
+    public final Integer RESTART_CNT = 30;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setCacheConfiguration(new CacheConfiguration());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int serverCount() {
+        return 1;
+    }
+
+    /** */
+    private volatile CyclicBarrier barrier;
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testCacheOperationReconnectApi() throws Exception {
+        clientMode = true;
+
+        final Ignite client = startGrid(serverCount());
+
+        assertNotNull(client.cache(null));
+
+        Ignite srv = clientRouter(client);
+
+        TestTcpDiscoverySpi srvSpi = spi(srv);
+
+        final AtomicBoolean stop = new AtomicBoolean(false);
+
+        final AtomicLong cntr = new AtomicLong();
+
+        final IgniteQueue<Object> queue = client.queue("test-queue", 1000, new CollectionConfiguration());
+
+        final IgniteAtomicLong atomicLong = client.atomicLong("counter", 0, true);
+
+        final IgniteAtomicSequence sequence = client.atomicSequence("sequence", 0, true);
+
+        final IgniteInternalFuture<Long> future = GridTestUtils.runMultiThreadedAsync(new
Callable<Object>() {
+            @Override public Object call() throws Exception {
+                IgniteCache<Integer, Integer> cache = client.cache(null);
+
+                IgniteCompute compute = client.compute();
+
+                Set<Integer> keys = new TreeSet<>();
+                final Map<Integer, Integer> entries = new TreeMap<>();
+
+                for (int i = 0; i < 50; i++) {
+                    keys.add(i);
+                    entries.put(i, i);
+                }
+
+                while (!stop.get()) {
+                    cntr.incrementAndGet();
+
+                    try {
+                        // Start cache operations.
+                        for (int i = 0; i < 10; i++) {
+                            cache.put(i, i);
+                            cache.get(i);
+                            cache.remove(i);
+
+                            cache.putAll(entries);
+
+                            cache.invokeAll(keys, new CacheEntryProcessor<Integer, Integer,
Object>() {
+                                @Override public Object process(MutableEntry<Integer,
Integer> entry,
+                                    Object... arguments) throws EntryProcessorException {
+                                    if (ThreadLocalRandom.current().nextBoolean())
+                                        entry.setValue(entry.getValue() * 100);
+                                    else
+                                        entry.remove();
+
+                                    return entry;
+                                }
+                            });
+                        }
+
+                        try (Transaction tx = client.transactions().txStart()) {
+                            for (int i = 0; i < 10; i++) {
+                                cache.put(i, i);
+                                cache.get(i);
+                            }
+
+                            tx.commit();
+                        }
+
+                        // Start async cache operations.
+                        IgniteCache<Integer, Integer> asyncCache = cache.withAsync();
+
+                        for (int i = 0; i < 10; i++) {
+                            asyncCache.put(i, i);
+
+                            asyncCache.future().get();
+
+                            asyncCache.get(i);
+
+                            asyncCache.future().get();
+                        }
+
+                        // Compute.
+//                        for (int i = 0; i < 10; i++) {
+//                            compute.broadcast(new IgniteCallable<Integer>() {
+//                                @IgniteInstanceResource
+//                                private Ignite ignite;
+//
+//                                @Override public Integer call() throws Exception {
+//                                    return ignite.cache(null).localSize();
+//                                }
+//                            });
+//
+//                            compute.broadcast(new IgniteRunnable() {
+//                                @Override public void run() {
+//                                    // No-op.
+//                                }
+//                            });
+//
+//                            compute.apply(new C1<String, String>() {
+//                                @Override public String apply(String o) {
+//                                    return o.toUpperCase();
+//                                }
+//                            }, Arrays.asList("a", "b", "c"));
+//                        }
+
+                        //Data structures.
+//                        for (int i = 0; i < 10; i++) {
+//                            assert atomicLong.incrementAndGet() >= 0;
+//
+//                            queue.offer("Test item");
+//
+//                            if (ThreadLocalRandom.current().nextBoolean())
+//                                for (int j = 0; j < 50; j++)
+//                                    queue.poll();
+//
+//                            assert queue.size() <= 1000;
+//
+//                            assert sequence.addAndGet(i + 1) >= 0;
+//                        }
+                    }
+                    catch (CacheException | IgniteException e) {
+                        log.info("Operation failed, ignore: " + e);
+                    }
+
+                    if (cntr.get() % 100 == 0)
+                        log.info("Iteration: " + cntr);
+
+                    if (barrier != null)
+                        try {
+                            barrier.await();
+                        }
+                        catch (BrokenBarrierException e) {
+                            log.warning("Broken barrier.", e);
+
+                            break;
+                        }
+                }
+
+                return null;
+            }
+        }, THREADS, "test-operation-thread-" + client.name());
+
+        final AtomicBoolean disconnected = new AtomicBoolean(false);
+
+        final AtomicBoolean reconnected = new AtomicBoolean(false);
+
+        client.events().localListen(new IgnitePredicate<Event>() {
+            @Override public boolean apply(Event evt) {
+                if (evt.type() == EVT_CLIENT_NODE_DISCONNECTED) {
+                    info("Disconnected: " + evt);
+
+                    if (!reconnected.get())
+                        disconnected.set(true);
+                }
+                else if (evt.type() == EVT_CLIENT_NODE_RECONNECTED) {
+                    info("Reconnected: " + evt);
+
+                    if (disconnected.get())
+                        reconnected.set(true);
+                }
+
+                return true;
+            }
+        }, EVT_CLIENT_NODE_DISCONNECTED, EVT_CLIENT_NODE_RECONNECTED);
+
+        for (int i = 0; i < RESTART_CNT; i++) {
+            U.sleep(2000);
+
+            log.info("Block reconnect.");
+
+            reconnected.set(false);
+
+            disconnected.set(false);
+
+            log.info("Fail client.");
+
+            srvSpi.failNode(client.cluster().localNode().id(), null);
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return disconnected.get();
+                }
+            }, 5000L);
+
+            barrier = new CyclicBarrier(THREADS + 1, new Runnable() {
+                @Override public void run() {
+                    barrier = null;
+                }
+            });
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return reconnected.get();
+                }
+            }, 5000L);
+
+            try {
+                barrier.await(10, TimeUnit.SECONDS);
+            }
+            catch (TimeoutException e) {
+                log.error("Failed. Operation hangs.");
+
+                for (Ignite ignite : G.allGrids())
+                    dumpCacheDebugInfo(ignite);
+
+                U.dumpThreads(log);
+
+                CyclicBarrier barrier0 = barrier;
+
+                if (barrier0 != null)
+                    barrier0.reset();
+
+                stop.set(true);
+
+                fail("Failed to wait for update.");
+            }
+        }
+
+        stop.set(true);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
index 58715a1..6ccbbe0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectServicesTest.java
@@ -18,7 +18,7 @@
 package org.apache.ignite.internal;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.processors.continuous.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
 import org.apache.ignite.internal.processors.service.*;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.services.*;
@@ -129,7 +129,7 @@ public class IgniteClientReconnectServicesTest extends IgniteClientReconnectAbst
 
         BlockTpcCommunicationSpi commSpi = commSpi(srv);
 
-        commSpi.blockMsg(GridContinuousMessage.class);
+        commSpi.blockMsg(GridNearTxPrepareResponse.class);
 
         final IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>()
{
             @Override public Object call() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
index e85c315..50feb86 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectStreamerTest.java
@@ -81,6 +81,7 @@ public class IgniteClientReconnectStreamerTest extends IgniteClientReconnectAbst
                 return srvCache.localSize() == 50;
             }
         }, 2000L);
+
         reconnectClientNode(client, srv, null);
 
         for (int i = 0; i < 50; i++)

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/29c7fa73/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
index fb41f0f..93137bf 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteClientReconnectTestSuite.java
@@ -33,7 +33,6 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
 
         suite.addTestSuite(IgniteClientReconnectStopTest.class);
         suite.addTestSuite(IgniteClientReconnectApiBlockTest.class);
-        suite.addTestSuite(IgniteClientReconnectFailoverSelfTest.class);
         suite.addTestSuite(IgniteClientReconnectDiscoveryStateTest.class);
         suite.addTestSuite(IgniteClientReconnectCacheTest.class);
         suite.addTestSuite(IgniteClientReconnectContinuousProcessorTest.class);
@@ -42,6 +41,7 @@ public class IgniteClientReconnectTestSuite extends TestSuite {
         suite.addTestSuite(IgniteClientReconnectCollectionsTest.class);
         suite.addTestSuite(IgniteClientReconnectServicesTest.class);
         suite.addTestSuite(IgniteClientReconnectStreamerTest.class);
+        suite.addTestSuite(IgniteClientReconnectFailoverTest.class);
 
         return suite;
     }


Mime
View raw message