ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [02/13] ignite git commit: IGNITE-2822 Fixed not effective approach.
Date Fri, 08 Apr 2016 17:38:13 GMT
IGNITE-2822 Fixed not effective approach.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da47901f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da47901f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da47901f

Branch: refs/heads/ignite-2004
Commit: da47901f07fd8c7fa47b3238bb37e083c93dfdc4
Parents: 21f5d0f
Author: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Authored: Wed Apr 6 19:29:24 2016 +0300
Committer: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Committed: Wed Apr 6 19:29:24 2016 +0300

----------------------------------------------------------------------
 .../continuous/CacheContinuousQueryHandler.java | 84 ++++++++------------
 1 file changed, 33 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/da47901f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 767697a..3576424 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -363,27 +363,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                     if (primary || skipPrimaryCheck) {
                         if (loc) {
                             if (!locCache) {
-                                Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx,
entry);
+                                Collection<CacheEntryEvent<? extends K, ? extends V>>
entries = handleEvent(ctx, entry);
 
                                 if (!entries.isEmpty()) {
-                                    final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
-
-                                    Iterable<CacheEntryEvent<? extends K, ? extends
V>> evts = F.viewReadOnly(entries,
-                                        new C1<CacheContinuousQueryEntry, CacheEntryEvent<?
extends K, ? extends V>>() {
-                                            @Override public CacheEntryEvent<? extends
K, ? extends V> apply(
-                                                CacheContinuousQueryEntry e) {
-                                                return new CacheContinuousQueryEvent<>(cache,
cctx, e);
-                                            }
-                                        },
-                                        new IgnitePredicate<CacheContinuousQueryEntry>()
{
-                                            @Override public boolean apply(CacheContinuousQueryEntry
entry) {
-                                                return !entry.isFiltered();
-                                            }
-                                        }
-                                    );
-
-                                    if (!F.isEmpty(evts))
-                                        locLsnr.onUpdated(evts);
+                                    locLsnr.onUpdated(entries);
 
                                     if (!internal && !skipPrimaryCheck)
                                         sendBackupAcknowledge(ackBuf.onAcknowledged(entry),
routineId, ctx);
@@ -607,7 +590,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
         final GridCacheContext cctx = cacheContext(ctx);
 
-        Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>();
+        Collection<CacheEntryEvent<? extends K, ? extends V>> entries0 = new
ArrayList<>();
 
         for (CacheContinuousQueryEntry e : entries) {
             GridCacheDeploymentManager depMgr = cctx.deploy();
@@ -636,24 +619,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
             }
         }
 
-        final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
-
-        if (!entries0.isEmpty()) {
-            Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0,
-                new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends
V>>() {
-                    @Override public CacheEntryEvent<? extends K, ? extends V> apply(CacheContinuousQueryEntry
e) {
-                        return new CacheContinuousQueryEvent<>(cache, cctx, e);
-                    }
-                },
-                new IgnitePredicate<CacheContinuousQueryEntry>() {
-                    @Override public boolean apply(CacheContinuousQueryEntry entry) {
-                        return !entry.isFiltered();
-                    }
-                }
-            );
-
-            locLsnr.onUpdated(evts);
-        }
+        if (!entries0.isEmpty())
+            locLsnr.onUpdated(entries0);
     }
 
     /**
@@ -661,24 +628,30 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
      * @param e entry.
      * @return Entry collection.
      */
-    private Collection<CacheContinuousQueryEntry> handleEvent(GridKernalContext ctx,
+    private Collection<CacheEntryEvent<? extends K, ? extends V>> handleEvent(GridKernalContext
ctx,
         CacheContinuousQueryEntry e) {
         assert e != null;
 
+        GridCacheContext<K, V> cctx = cacheContext(ctx);
+
+        final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name());
+
         if (internal) {
             if (e.isFiltered())
                 return Collections.emptyList();
             else
-                return F.asList(e);
+                return F.<CacheEntryEvent<? extends K, ? extends V>>asList(
+                    new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
         }
 
         // Initial query entry or evicted entry. These events should be fired immediately.
         if (e.updateCounter() == -1L)
-            return F.asList(e);
+            return F.<CacheEntryEvent<? extends K, ? extends V>>asList(
+                new CacheContinuousQueryEvent<K, V>(cache, cctx, e));
 
         PartitionRecovery rec = getOrCreatePartitionRecovery(ctx, e.partition());
 
-        return rec.collectEntries(e);
+        return rec.collectEntries(cctx, cache, e);
     }
 
     /**
@@ -802,19 +775,24 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         /**
          * Add continuous entry.
          *
+         * @param cctx Cache context.
+         * @param cache Cache.
          * @param entry Cache continuous query entry.
-         * @return Collection entries which will be fired.
+         * @return Collection entries which will be fired. This collection should contains
only non-filtered events.
          */
-        public Collection<CacheContinuousQueryEntry> collectEntries(CacheContinuousQueryEntry
entry) {
+        public <K, V> Collection<CacheEntryEvent<? extends K, ? extends V>>
collectEntries(GridCacheContext cctx,
+            IgniteCache cache,
+            CacheContinuousQueryEntry entry) {
             assert entry != null;
 
             if (entry.topologyVersion() == null) { // Possible if entry is sent from old
node.
                 assert entry.updateCounter() == 0L : entry;
 
-                return F.asList(entry);
+                return F.<CacheEntryEvent<? extends K, ? extends V>>
+                    asList(new CacheContinuousQueryEvent<K, V>(cache, cctx, entry));
             }
 
-            List<CacheContinuousQueryEntry> entries;
+            List<CacheEntryEvent<? extends K, ? extends V>> entries;
 
             synchronized (pendingEvts) {
                 // Received first event.
@@ -823,7 +801,10 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                     curTop = entry.topologyVersion();
 
-                    return F.asList(entry);
+                    return !entry.isFiltered() ?
+                        F.<CacheEntryEvent<? extends K, ? extends V>>
+                            asList(new CacheContinuousQueryEvent<K, V>(cache, cctx,
entry)) :
+                        Collections.<CacheEntryEvent<? extends K, ? extends V>>emptyList();
                 }
 
                 if (curTop.compareTo(entry.topologyVersion()) < 0) {
@@ -832,7 +813,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                         for (CacheContinuousQueryEntry evt : pendingEvts.values()) {
                             if (evt != HOLE && !evt.isFiltered())
-                                entries.add(evt);
+                                entries.add(new CacheContinuousQueryEvent<K, V>(cache,
cctx, evt));
                         }
 
                         pendingEvts.clear();
@@ -841,7 +822,8 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
 
                         lastFiredEvt = entry.updateCounter();
 
-                        entries.add(entry);
+                        if (!entry.isFiltered())
+                            entries.add(new CacheContinuousQueryEvent<K, V>(cache,
cctx, entry));
 
                         return entries;
                     }
@@ -880,7 +862,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                         Map.Entry<Long, CacheContinuousQueryEntry> e = iter.next();
 
                         if (e.getValue() != HOLE && !e.getValue().isFiltered())
-                            entries.add(e.getValue());
+                            entries.add(new CacheContinuousQueryEvent<K, V>(cache,
cctx, e.getValue()));
 
                         lastFiredEvt = e.getKey();
 
@@ -896,7 +878,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                             ++lastFiredEvt;
 
                             if (e.getValue() != HOLE && !e.getValue().isFiltered())
-                                entries.add(e.getValue());
+                                entries.add(new CacheContinuousQueryEvent<K, V>(cache,
cctx, e.getValue()));
 
                             iter.remove();
                         }


Mime
View raw message