ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [07/19] ignite git commit: ignite-1.5 Fixed hang on metadata update inside put in atomic cache when topology read lock is held. Also fixed several test issues.
Date Thu, 24 Dec 2015 18:06:31 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index 5a4ba14..283da80 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -1009,107 +1009,111 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
      */
     private void checkEvents(final List<T3<Object, Object, Object>> expEvts,
final CacheEventListener2 lsnr,
         boolean lostAllow, boolean wait) throws Exception {
-        if (wait)
+        if (wait) {
             GridTestUtils.waitForCondition(new PA() {
-                @Override public boolean apply() {
+                @Override
+                public boolean apply() {
                     return expEvts.size() == lsnr.size();
                 }
             }, 2000L);
+        }
 
-        Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
+        synchronized (lsnr) {
+            Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
 
-        for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
-            prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
+            for (Map.Entry<Integer, List<CacheEntryEvent<?, ?>>> e : lsnr.evts.entrySet())
+                prevMap.put(e.getKey(), new ArrayList<>(e.getValue()));
 
-        List<T3<Object, Object, Object>> lostEvts = new ArrayList<>();
+            List<T3<Object, Object, Object>> lostEvts = new ArrayList<>();
 
-        for (T3<Object, Object, Object> exp : expEvts) {
-            List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
+            for (T3<Object, Object, Object> exp : expEvts) {
+                List<CacheEntryEvent<?, ?>> rcvdEvts = lsnr.evts.get(exp.get1());
 
-            if (F.eq(exp.get2(), exp.get3()))
-                continue;
+                if (F.eq(exp.get2(), exp.get3()))
+                    continue;
 
-            if (rcvdEvts == null || rcvdEvts.isEmpty()) {
-                lostEvts.add(exp);
+                if (rcvdEvts == null || rcvdEvts.isEmpty()) {
+                    lostEvts.add(exp);
 
-                continue;
-            }
+                    continue;
+                }
 
-            Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator();
+                Iterator<CacheEntryEvent<?, ?>> iter = rcvdEvts.iterator();
 
-            boolean found = false;
+                boolean found = false;
 
-            while (iter.hasNext()) {
-                CacheEntryEvent<?, ?> e = iter.next();
+                while (iter.hasNext()) {
+                    CacheEntryEvent<?, ?> e = iter.next();
 
-                if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue()))
-                    && equalOldValue(e, exp)) {
-                    found = true;
+                    if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue()))
+                            && equalOldValue(e, exp)) {
+                        found = true;
 
-                    iter.remove();
+                        iter.remove();
 
-                    break;
+                        break;
+                    }
                 }
-            }
 
-            // Lost event is acceptable.
-            if (!found)
-                lostEvts.add(exp);
-        }
+                // Lost event is acceptable.
+                if (!found)
+                    lostEvts.add(exp);
+            }
 
-        boolean dup = false;
+            boolean dup = false;
 
-        // Check duplicate.
-        if (!lsnr.evts.isEmpty()) {
-            for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) {
-                if (!evts.isEmpty()) {
-                    for (CacheEntryEvent<?, ?> e : evts) {
-                        boolean found = false;
+            // Check duplicate.
+            if (!lsnr.evts.isEmpty()) {
+                for (List<CacheEntryEvent<?, ?>> evts : lsnr.evts.values()) {
+                    if (!evts.isEmpty()) {
+                        for (CacheEntryEvent<?, ?> e : evts) {
+                            boolean found = false;
 
-                        for (T3<Object, Object, Object> lostEvt : lostEvts) {
-                            if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2()))
{
-                                found = true;
+                            for (T3<Object, Object, Object> lostEvt : lostEvts) {
+                                if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2()))
{
+                                    found = true;
 
-                                lostEvts.remove(lostEvt);
+                                    lostEvts.remove(lostEvt);
 
-                                break;
+                                    break;
+                                }
                             }
-                        }
 
-                        if (!found) {
-                            dup = true;
+                            if (!found) {
+                                dup = true;
 
-                            break;
+                                break;
+                            }
                         }
                     }
                 }
-            }
 
-            if (dup) {
-                for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
-                    if (!e.isEmpty()) {
-                        for (CacheEntryEvent<?, ?> event : e)
-                            log.error("Got duplicate event: " + event);
+                if (dup) {
+                    for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values())
{
+                        if (!e.isEmpty()) {
+                            for (CacheEntryEvent<?, ?> event : e)
+                                log.error("Got duplicate event: " + event);
+                        }
                     }
                 }
             }
-        }
 
-        if (!lostAllow && lostEvts.size() > 100) {
-            log.error("Lost event cnt: " + lostEvts.size());
+            if (!lostAllow && lostEvts.size() > 100) {
+                log.error("Lost event cnt: " + lostEvts.size());
 
-            for (T3<Object, Object, Object> e : lostEvts)
-                log.error("Lost event: " + e);
+                for (T3<Object, Object, Object> e : lostEvts)
+                    log.error("Lost event: " + e);
 
-            fail("Lose events, see log for details.");
-        }
+                fail("Lose events, see log for details.");
+            }
 
-        log.error("Lost event cnt: " + lostEvts.size());
+            log.error("Lost event cnt: " + lostEvts.size());
 
-        expEvts.clear();
+            expEvts.clear();
 
-        lsnr.evts.clear();
-        lsnr.vals.clear();
+            lsnr.evts.clear();
+            lsnr.vals.clear();
+        }
     }
 
     /**
@@ -2111,7 +2115,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
         /**
          * @return Count events.
          */
-        public int size() {
+        public synchronized int size() {
             int size = 0;
 
             for (List<CacheEntryEvent<?, ?>> e : evts.values())

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 5f5dfd4..db59a7f 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.typedef.PA;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -128,6 +129,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends
GridCommo
 
         cfg.setDiscoverySpi(disco);
 
+        ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1);
+
         return cfg;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
index b529b6c..49c6968 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/ClosureServiceClientsNodesTest.java
@@ -27,6 +27,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteCallable;
 import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
@@ -38,6 +39,7 @@ import org.apache.ignite.services.ServiceDescriptor;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -171,14 +173,20 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest
{
         for (int i = 0 ; i < NODES_CNT; i++) {
             log.info("Iteration: " + i);
 
-            Ignite ignite = grid(i);
+            final Ignite ignite = grid(i);
 
             ignite.services().deployNodeSingleton(SINGLETON_NAME, new TestService());
 
-            ClusterGroup grp = ignite.cluster();
+            final ClusterGroup grp = ignite.cluster();
 
             assertEquals(NODES_CNT, grp.nodes().size());
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return ignite.services(grp).serviceDescriptors().size() == 1;
+                }
+            }, 5000);
+
             Collection<ServiceDescriptor> srvDscs = ignite.services(grp).serviceDescriptors();
 
             assertEquals(1, srvDscs.size());
@@ -206,14 +214,20 @@ public class ClosureServiceClientsNodesTest extends GridCommonAbstractTest
{
         for (int i = 0 ; i < NODES_CNT; i++) {
             log.info("Iteration: " + i);
 
-            Ignite ignite = grid(i);
+            final Ignite ignite = grid(i);
 
             ignite.services(ignite.cluster().forClients()).deployNodeSingleton(SINGLETON_NAME,
new TestService());
 
-            ClusterGroup grp = ignite.cluster();
+            final ClusterGroup grp = ignite.cluster();
 
             assertEquals(NODES_CNT, grp.nodes().size());
 
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    return ignite.services(grp).serviceDescriptors().size() == 1;
+                }
+            }, 5000);
+
             Collection<ServiceDescriptor> srvDscs = ignite.services(grp).serviceDescriptors();
 
             assertEquals(1, srvDscs.size());

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
index dfea37a..92b18ab 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/service/GridServiceProcessorStopSelfTest.java
@@ -17,15 +17,18 @@
 
 package org.apache.ignite.internal.processors.service;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteServices;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.services.Service;
 import org.apache.ignite.services.ServiceContext;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 
 /**
@@ -49,10 +52,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest
{
 
         final Ignite ignite = startGrid(0);
 
-        Thread t = new Thread(new Runnable() {
-            @Override public void run() {
-                Thread.currentThread().setName("deploy-thread");
-
+        IgniteInternalFuture<?> fut = GridTestUtils.runAsync(new Callable<Void>()
{
+            @Override public Void call() throws Exception {
                 IgniteServices svcs = ignite.services();
 
                 IgniteServices services = svcs.withAsync();
@@ -67,13 +68,13 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest
{
                 catch (IgniteException e) {
                     finishLatch.countDown();
                 }
-                catch (Throwable e) {
-                    log.error("Service deployment error: ", e);
+                finally {
+                    finishLatch.countDown();
                 }
-            }
-        });
 
-        t.start();
+                return null;
+            }
+        }, "deploy-thread");
 
         depLatch.await();
 
@@ -85,6 +86,8 @@ public class GridServiceProcessorStopSelfTest extends GridCommonAbstractTest
{
             U.dumpThreads(log);
 
         assertTrue("Deploy future isn't completed", wait);
+
+        fut.get();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
index 731b0c7..7bbf531 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/communication/tcp/GridTcpCommunicationSpiMultithreadedSelfTest.java
@@ -37,6 +37,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.nio.GridCommunicationClient;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
@@ -55,6 +56,7 @@ import org.apache.ignite.spi.communication.GridTestMessage;
 import org.apache.ignite.testframework.GridSpiTestContext;
 import org.apache.ignite.testframework.GridTestNode;
 import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.IgniteTestResources;
 import org.apache.ignite.testframework.junits.spi.GridSpiAbstractTest;
 import org.jsr166.ConcurrentLinkedDeque8;
@@ -90,6 +92,9 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
     /** Initialized nodes */
     private static final List<ClusterNode> nodes = new ArrayList<>();
 
+    /** */
+    private static GridTimeoutProcessor timeoutProcessor;
+
     /** Flag indicating if listener should reject messages. */
     private static boolean reject;
 
@@ -472,6 +477,12 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
 
         Map<ClusterNode, GridSpiTestContext> ctxs = new HashMap<>();
 
+        timeoutProcessor = new GridTimeoutProcessor(new GridTestKernalContext(log));
+
+        timeoutProcessor.start();
+
+        timeoutProcessor.onKernalStart();
+
         for (int i = 0; i < getSpiCount(); i++) {
             CommunicationSpi<Message> spi = newCommunicationSpi();
 
@@ -485,6 +496,8 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
 
             GridSpiTestContext ctx = initSpiContext();
 
+            ctx.timeoutProcessor(timeoutProcessor);
+
             ctx.setLocalNode(node);
 
             info(">>> Initialized context: nodeId=" + ctx.localNode().id());
@@ -548,6 +561,14 @@ public class GridTcpCommunicationSpiMultithreadedSelfTest extends GridSpiAbstrac
 
     /** {@inheritDoc} */
     @Override protected void afterTestsStopped() throws Exception {
+        if (timeoutProcessor != null) {
+            timeoutProcessor.onKernalStop(true);
+
+            timeoutProcessor.stop(true);
+
+            timeoutProcessor = null;
+        }
+
         for (CommunicationSpi<Message> spi : spis.values()) {
             spi.onContextDestroyed();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
index 5af0596..0df7da6 100644
--- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySelfTest.java
@@ -897,8 +897,8 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
      */
     public void testIpFinderCleaning() throws Exception {
         try {
-            ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024),
-                new InetSocketAddress("host2", 1024)));
+            ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024),
+                new InetSocketAddress("1.1.1.2", 1024)));
 
             Ignite g1 = startGrid(1);
 
@@ -912,13 +912,19 @@ public class TcpDiscoverySelfTest extends GridCommonAbstractTest {
                 }
             }, timeout);
 
+            if (ipFinder.getRegisteredAddresses().size() != 1) {
+                log.error("Failed to wait for IP cleanup, will dump threads.");
+
+                U.dumpThreads(log);
+            }
+
             assert ipFinder.getRegisteredAddresses().size() == 1 : "ipFinder=" + ipFinder.getRegisteredAddresses();
 
             // Check that missing addresses are returned back.
             ipFinder.unregisterAddresses(ipFinder.getRegisteredAddresses()); // Unregister
valid address.
 
-            ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("host1", 1024),
-                new InetSocketAddress("host2", 1024)));
+            ipFinder.registerAddresses(Arrays.asList(new InetSocketAddress("1.1.1.1", 1024),
+                new InetSocketAddress("1.1.1.2", 1024)));
 
             GridTestUtils.waitForCondition(new GridAbsPredicate() {
                 @Override public boolean apply() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index e257a97..0bffe8b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -41,6 +41,8 @@ import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.timeout.GridSpiTimeoutObject;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
@@ -89,6 +91,16 @@ public class GridSpiTestContext implements IgniteSpiContext {
     /** */
     private MessageFactory factory;
 
+    /** */
+    private GridTimeoutProcessor timeoutProcessor;
+
+    /**
+     * @param timeoutProcessor Timeout processor.
+     */
+    public void timeoutProcessor(GridTimeoutProcessor timeoutProcessor) {
+        this.timeoutProcessor = timeoutProcessor;
+    }
+
     /** {@inheritDoc} */
     @Override public Collection<ClusterNode> remoteNodes() {
         return rmtNodes;
@@ -530,12 +542,14 @@ public class GridSpiTestContext implements IgniteSpiContext {
 
     /** {@inheritDoc} */
     @Override public void addTimeoutObject(IgniteSpiTimeoutObject obj) {
-        // No-op.
+        if (timeoutProcessor != null)
+            timeoutProcessor.addTimeoutObject(new GridSpiTimeoutObject(obj));
     }
 
     /** {@inheritDoc} */
     @Override public void removeTimeoutObject(IgniteSpiTimeoutObject obj) {
-        // No-op.
+        if (timeoutProcessor != null)
+            timeoutProcessor.removeTimeoutObject(new GridSpiTimeoutObject(obj));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
----------------------------------------------------------------------
diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
index 77e2dae..4a84931 100644
--- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
+++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionFilter.java
@@ -38,14 +38,16 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteTransactions;
-import org.apache.ignite.cache.CachePartialUpdateException;
+import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.startup.servlet.ServletContextListenerStartup;
 import org.apache.ignite.transactions.Transaction;
 
@@ -191,6 +193,9 @@ public class WebSessionFilter implements Filter {
     /** Transactions enabled flag. */
     private boolean txEnabled;
 
+    /** */
+    private int retries;
+
     /** {@inheritDoc} */
     @Override public void init(FilterConfig cfg) throws ServletException {
         ctx = cfg.getServletContext();
@@ -207,8 +212,6 @@ public class WebSessionFilter implements Filter {
             cfg.getInitParameter(WEB_SES_MAX_RETRIES_ON_FAIL_NAME_PARAM),
             ctx.getInitParameter(WEB_SES_MAX_RETRIES_ON_FAIL_NAME_PARAM));
 
-        int retries;
-
         try {
             retries = retriesStr != null ? Integer.parseInt(retriesStr) : DFLT_MAX_RETRIES_ON_FAIL;
         }
@@ -226,10 +229,6 @@ public class WebSessionFilter implements Filter {
 
         log = webSesIgnite.log();
 
-        if (webSesIgnite == null)
-            throw new IgniteException("Grid for web sessions caching is not started (is it
configured?): " +
-                gridName);
-
         cache = webSesIgnite.cache(cacheName);
 
         if (cache == null)
@@ -409,41 +408,62 @@ public class WebSessionFilter implements Filter {
 
         WebSession cached = new WebSession(ses, true);
 
-        try {
-            while (true) {
-                try {
-                    IgniteCache<String, WebSession> cache0;
-
-                    if (cached.getMaxInactiveInterval() > 0) {
-                        long ttl = cached.getMaxInactiveInterval() * 1000;
+        for (int i = 0; i < retries; i++) {
+            try {
+                IgniteCache<String, WebSession> cache0;
 
-                        ExpiryPolicy plc = new ModifiedExpiryPolicy(new Duration(MILLISECONDS,
ttl));
+                if (cached.getMaxInactiveInterval() > 0) {
+                    long ttl = cached.getMaxInactiveInterval() * 1000;
 
-                        cache0 = cache.withExpiryPolicy(plc);
-                    }
-                    else
-                        cache0 = cache;
+                    ExpiryPolicy plc = new ModifiedExpiryPolicy(new Duration(MILLISECONDS,
ttl));
 
-                    WebSession old = cache0.getAndPutIfAbsent(sesId, cached);
+                    cache0 = cache.withExpiryPolicy(plc);
+                }
+                else
+                    cache0 = cache;
 
-                    if (old != null) {
-                        cached = old;
+                WebSession old = cache0.getAndPutIfAbsent(sesId, cached);
 
-                        if (cached.isNew())
-                            cached = new WebSession(cached, false);
-                    }
+                if (old != null) {
+                    cached = old;
 
-                    break;
+                    if (cached.isNew())
+                        cached = new WebSession(cached, false);
                 }
-                catch (CachePartialUpdateException e) {
+
+                break;
+            }
+            catch (CacheException | IgniteException e) {
+                if (log.isDebugEnabled())
+                    log.debug(e.getMessage());
+
+                if (i == retries - 1)
+                    throw new IgniteException("Failed to save session: " + sesId, e);
+                else {
                     if (log.isDebugEnabled())
-                        log.debug(e.getMessage());
+                        log.debug("Failed to save session (will retry): " + sesId);
+
+                    IgniteFuture<?> retryFut = null;
+
+                    if (X.hasCause(e, ClusterTopologyException.class)) {
+                        ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+                        assert cause != null : e;
+
+                        retryFut = cause.retryReadyFuture();
+                    }
+
+                    if (retryFut != null) {
+                        try {
+                            retryFut.get();
+                        }
+                        catch (IgniteException retryErr) {
+                            throw new IgniteException("Failed to save session: " + sesId,
retryErr);
+                        }
+                    }
                 }
             }
         }
-        catch (CacheException e) {
-            throw new IgniteException("Failed to save session: " + sesId, e);
-        }
 
         return cached;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
----------------------------------------------------------------------
diff --git a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
index 82f1633..b826031 100644
--- a/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
+++ b/modules/web/src/main/java/org/apache/ignite/cache/websession/WebSessionListener.java
@@ -30,12 +30,14 @@ import javax.cache.processor.EntryProcessor;
 import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.cache.CachePartialUpdateException;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteFuture;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
@@ -117,7 +119,7 @@ class WebSessionListener {
 
                     break;
                 }
-                catch (CachePartialUpdateException ignored) {
+                catch (CacheException | IgniteException e) {
                     if (i == retries - 1) {
                         U.warn(log, "Failed to apply updates for session (maximum number
of retries exceeded) [sesId=" +
                             sesId + ", retries=" + retries + ']');
@@ -125,12 +127,25 @@ class WebSessionListener {
                     else {
                         U.warn(log, "Failed to apply updates for session (will retry): "
+ sesId);
 
-                        U.sleep(RETRY_DELAY);
+                        IgniteFuture<?> retryFut = null;
+
+                        if (X.hasCause(e, ClusterTopologyException.class)) {
+                            ClusterTopologyException cause = X.cause(e, ClusterTopologyException.class);
+
+                            assert cause != null : e;
+
+                            retryFut = cause.retryReadyFuture();
+                        }
+
+                        if (retryFut != null)
+                            retryFut.get();
+                        else
+                            U.sleep(RETRY_DELAY);
                     }
                 }
             }
         }
-        catch (CacheException | IgniteInterruptedCheckedException e) {
+        catch (Exception e) {
             U.error(log, "Failed to update session attributes [id=" + sesId + ']', e);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/53ec76ff/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
index 4508edb..7a321d6 100644
--- a/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
+++ b/modules/web/src/test/java/org/apache/ignite/internal/websession/WebSessionSelfTest.java
@@ -142,7 +142,6 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
                     }
 
                     assert idx != -1;
-                    assert srv != null;
 
                     stopServer(srv);
 
@@ -181,7 +180,6 @@ public class WebSessionSelfTest extends GridCommonAbstractTest {
                 }
 
                 assert idx != -1;
-                assert srv != null;
 
                 int port = TEST_JETTY_PORT + idx;
 


Mime
View raw message