ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject ignite git commit: IGNITE-462 Fixed tests.
Date Mon, 09 Nov 2015 12:12:44 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-426-2 65fed011e -> f89eb2166


IGNITE-462 Fixed tests.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f89eb216
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f89eb216
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f89eb216

Branch: refs/heads/ignite-426-2
Commit: f89eb21662a34501129a0a9b190cd700cda45595
Parents: 65fed01
Author: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Authored: Mon Nov 9 15:16:04 2015 +0300
Committer: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Committed: Mon Nov 9 15:16:04 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   | 17 ++++-
 .../continuous/CacheContinuousQueryEvent.java   |  3 +-
 .../continuous/CacheContinuousQueryHandler.java | 70 ++++++++++++++++++--
 .../CacheContinuousQueryListener.java           |  6 ++
 .../continuous/CacheContinuousQueryManager.java | 25 +++++++
 ...ContinuousQueryFailoverAbstractSelfTest.java | 67 +++++++++++--------
 6 files changed, 151 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f89eb216/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index ae0355b..7befd42 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -332,8 +332,21 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
             cctx.mvcc().removeAtomicFuture(version());
 
             if (err != null) {
-                for (KeyCacheObject key : keys)
+                int i = 0;
+
+                for (KeyCacheObject key : keys) {
                     updateRes.addFailedKey(key, err);
+
+                    if (i < updates.size()) {
+
+                        T4<GridDhtCacheEntry, CacheObject, CacheObject, Long> upd =
updates.get(i);
+
+                        cctx.continuousQueries().skipUpdateEvent(key, upd.get1().partition(),
upd.get4(),
+                            updateReq.topologyVersion());
+
+                        ++i;
+                    }
+                }
             }
             else {
                 assert keys.size() >= updates.size();
@@ -348,7 +361,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
 
                     try {
                         cctx.continuousQueries().onEntryUpdated(upd.get1(), key, upd.get2(),
upd.get3(), true, false,
-                            upd.get4(), updateRes.topologyVersion());
+                            upd.get4(), updateReq.topologyVersion());
                     }
                     catch (IgniteCheckedException e) {
                         U.warn(log, "Failed to send continuous query message. [key=" + key
+ ", newVal="

http://git-wip-us.apache.org/repos/asf/ignite/blob/f89eb216/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index 7417138..a1ebe39 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -58,8 +58,7 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K,
V> {
     }
 
     /** {@inheritDoc} */
-    @Override
-    public K getKey() {
+    @Override public K getKey() {
         return e.key().value(cctx.cacheObjectContext(), false);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/f89eb216/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 6c5deff..bd6f5d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -248,7 +248,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
 
         assert !skipPrimaryCheck || loc;
 
-        GridCacheContext<K, V> cctx = cacheContext(ctx);
+        final GridCacheContext<K, V> cctx = cacheContext(ctx);
 
         if (!internal && cctx != null && initUpdCntrs != null) {
             Map<Integer, Long> map = cctx.topology().updateCounters();
@@ -440,6 +440,40 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
                 sendBackupAcknowledge(ackBuf.acknowledgeOnTimeout(), routineId, ctx);
             }
 
+            @Override public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt,
AffinityTopologyVersion topVer) {
+                try {
+                    assert evt != null;
+
+                    CacheContinuousQueryEntry e = evt.entry();
+
+                    EntryBuffer buf = snds.get(e.partition());
+
+                    if (buf == null) {
+                        buf = new EntryBuffer();
+
+                        EntryBuffer oldRec = snds.putIfAbsent(e.partition(), buf);
+
+                        if (oldRec != null)
+                            buf = oldRec;
+                    }
+
+                    e = buf.skipEntry(e, topVer);
+
+                    if (e != null)
+                        ctx.continuous().addNotification(nodeId, routineId, e, topic, sync,
true);
+                }
+                catch (ClusterTopologyCheckedException ex) {
+                    IgniteLogger log = ctx.log(getClass());
+
+                    if (log.isDebugEnabled())
+                        log.debug("Failed to send event notification to node, node left cluster
" +
+                                "[node=" + nodeId + ", err=" + ex + ']');
+                }
+                catch (IgniteCheckedException ex) {
+                    U.error(ctx.log(getClass()), "Failed to send event notification to node:
" + nodeId, ex);
+                }
+            }
+
             @Override public void onPartitionEvicted(int part) {
                 for (Iterator<CacheContinuousQueryEntry> it = backupQueue.iterator();
it.hasNext();) {
                     if (it.next().partition() == part)
@@ -811,6 +845,34 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
         }
 
         /**
+         * @param e Entry.
+         * @param topVer Topology version.
+         * @return Continuous query entry.
+         */
+        private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e, AffinityTopologyVersion
topVer) {
+            if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) {
+
+                e.markFiltered();
+
+                return e;
+            }
+            else {
+                buf.add(e.updateCounter());
+
+                // Double check. If another thread sent a event with counter higher than
this event.
+                if (lastFiredCntr.get() > e.updateCounter() && buf.contains(e.updateCounter()))
{
+                    buf.remove(e.updateCounter());
+
+                    e.markFiltered();
+
+                    return e;
+                }
+                else
+                    return null;
+            }
+        }
+
+        /**
          * Add continuous entry.
          *
          * @param e Cache continuous query entry.
@@ -819,12 +881,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
         public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) {
             assert e != null;
 
-            if (!e.isBackup()) {
-                int z = 0;
-
-                ++z;
-            }
-
             if (e.isFiltered()) {
                 Long last = buf.lastx();
                 Long first = buf.firstx();

http://git-wip-us.apache.org/repos/asf/ignite/blob/f89eb216/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 4f345ed..4f28bce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -66,6 +66,12 @@ interface CacheContinuousQueryListener<K, V> {
     public void acknowledgeBackupOnTimeout(GridKernalContext ctx);
 
     /**
+     * @param evt Event
+     * @param topVer Topology version.
+     */
+    public void skipUpdateEvent(CacheContinuousQueryEvent<K, V> evt, AffinityTopologyVersion
topVer);
+
+    /**
      * @param part Partition.
      */
     public void onPartitionEvicted(int part);

http://git-wip-us.apache.org/repos/asf/ignite/blob/f89eb216/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 5f77c2e..925561b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -158,6 +158,31 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
     }
 
     /**
+     * @param partId Partition id.
+     * @param updCntr Updated counter.
+     * @param topVer Topology version.
+     */
+    public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion
topVer) {
+        if (lsnrCnt.get() > 0) {
+            CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+                cctx.cacheId(),
+                UPDATED,
+                key,
+                null,
+                null,
+                partId,
+                updCntr,
+                topVer);
+
+            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+                    cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
+
+            for (CacheContinuousQueryListener lsnr : lsnrs.values())
+                lsnr.skipUpdateEvent(evt, topVer);
+        }
+    }
+
+    /**
      * @param e Cache entry.
      * @param key Key.
      * @param newVal New value.

http://git-wip-us.apache.org/repos/asf/ignite/blob/f89eb216/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
index a0b1878..e82c673 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java
@@ -338,7 +338,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
 
         int killedNode = rnd.nextInt(SRV_NODES);
 
-        for (int i = 0; i < 20; i++) {
+        for (int i = 0; i < 10; i++) {
             List<Integer> keys = testKeys(grid(0).cache(null), 10);
 
             for (Integer key : keys) {
@@ -1004,11 +1004,22 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
      */
     private void checkEvents(final List<T3<Object, Object, Object>> expEvts,
final CacheEventListener2 lsnr,
         boolean lostAllow) throws Exception {
-        GridTestUtils.waitForCondition(new PA() {
-            @Override public boolean apply() {
-                return expEvts.size() == lsnr.size();
-            }
-        }, 2000L);
+        checkEvents(expEvts, lsnr, lostAllow, true);
+    }
+
+    /**
+     * @param expEvts Expected events.
+     * @param lsnr Listener.
+     * @param lostAllow If {@code true} than won't assert on lost events.
+     */
+    private void checkEvents(final List<T3<Object, Object, Object>> expEvts,
final CacheEventListener2 lsnr,
+        boolean lostAllow, boolean wait) throws Exception {
+        if (wait)
+            GridTestUtils.waitForCondition(new PA() {
+                @Override public boolean apply() {
+                    return expEvts.size() == lsnr.size();
+                }
+            }, 2000L);
 
         Map<Integer, List<CacheEntryEvent<?, ?>>> prevMap = new HashMap<>(lsnr.evts.size());
 
@@ -1080,26 +1091,16 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
             }
 
             if (dup) {
-                for (T3<Object, Object, Object> e : lostEvents)
-                    log.error("Lost event: " + e);
-
                 for (List<CacheEntryEvent<?, ?>> e : lsnr.evts.values()) {
                     if (!e.isEmpty()) {
-                        for (CacheEntryEvent<?, ?> event : e) {
-                            List<CacheEntryEvent<?, ?>> entries = new ArrayList<>();
-
-                            for (CacheEntryEvent<?, ?> ev0 : prevMap.get(event.getKey()))
{
-                                if (F.eq(event.getValue(), ev0.getValue()) && F.eq(event.getOldValue(),
-                                    ev0.getOldValue()))
-                                    entries.add(ev0);
-                            }
-                        }
+                        for (CacheEntryEvent<?, ?> event : e)
+                            log.error("Got duplicate event: " + event);
                     }
                 }
             }
         }
 
-        if (!lostAllow && !lostEvents.isEmpty()) {
+        if (!lostAllow && lostEvents.size() > 100) {
             log.error("Lost event cnt: " + lostEvents.size());
 
             for (T3<Object, Object, Object> e : lostEvents)
@@ -1699,23 +1700,37 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest
extends GridC
 
         final AtomicReference<CyclicBarrier> checkBarrier = new AtomicReference<>();
 
+        final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
         IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>()
{
             @Override public Void call() throws Exception {
-                final int idx = SRV_NODES + 1;
-
                 while (!stop.get() && !err) {
-                    log.info("Start node: " + idx);
+                    final int idx = rnd.nextInt(SRV_NODES);
 
-                    startGrid(idx);
+                    log.info("Stop node: " + idx);
+
+                    stopGrid(idx);
 
                     Thread.sleep(300);
 
+                    GridTestUtils.waitForCondition(new PA() {
+                        @Override public boolean apply() {
+                            return qryCln.cluster().nodes().size() == SRV_NODES;
+                        }
+                    }, 5000L);
+
                     try {
-                        log.info("Stop node: " + idx);
+                        log.info("Start node: " + idx);
 
-                        stopGrid(idx);
+                        startGrid(idx);
 
                         Thread.sleep(300);
+
+                        GridTestUtils.waitForCondition(new PA() {
+                            @Override public boolean apply() {
+                                return qryCln.cluster().nodes().size() == SRV_NODES + 1;
+                            }
+                        }, 5000L);
                     }
                     catch (Exception e) {
                         log.warning("Failed to stop nodes.", e);
@@ -1742,7 +1757,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends
GridC
                                 for (List<T3<Object, Object, Object>> evt : expEvts)
                                     expEvts0.addAll(evt);
 
-                                checkEvents(expEvts0, lsnr, false);
+                                checkEvents(expEvts0, lsnr, false, false);
 
                                 for (List<T3<Object, Object, Object>> evt : expEvts)
                                     evt.clear();


Mime
View raw message