ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: cc
Date Fri, 26 May 2017 04:01:49 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-cc e36163fc4 -> 01f45c1b8


cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01f45c1b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01f45c1b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01f45c1b

Branch: refs/heads/ignite-5075-cc
Commit: 01f45c1b8e2dcb2a5eea0596a55c434f682cc60b
Parents: e36163f
Author: sboikov <sboikov@gridgain.com>
Authored: Fri May 26 07:01:40 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri May 26 07:01:40 2017 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        |  16 +-
 .../CacheContinuousQueryEventBuffer.java        | 150 ++++++++++++-------
 .../continuous/CacheContinuousQueryHandler.java |   4 +-
 3 files changed, 107 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/01f45c1b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 544f847..8b8c87c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -789,14 +789,11 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
 
         boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg
!= null;
 
-        //todo check
         for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
             if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
                 continue;
 
             if (topChanged) {
-                cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
-
                 // Partition release future is done so we can flush the write-behind store.
                 cacheCtx.store().forceFlush();
             }
@@ -1101,10 +1098,23 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
     }
 
+    private boolean serverNotDiscoveryEvent() {
+        return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode());
+    }
+
     /** {@inheritDoc} */
     @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable
err) {
         boolean realExchange = !dummy && !forcePreload;
 
+        if (realExchange && !cctx.kernalContext().clientNode() && (serverNotDiscoveryEvent()
|| affChangeMsg != null)) {
+            for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+                if (cacheCtx.isLocal())
+                    continue;
+
+                cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion());
+            }
+       }
+
         if (err == null && realExchange) {
             for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
                 if (cacheCtx.isLocal())

http://git-wip-us.apache.org/repos/asf/ignite/blob/01f45c1b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index fd4029c..c59b851 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -40,6 +40,9 @@ public class CacheContinuousQueryEventBuffer {
         IgniteSystemProperties.getInteger("IGNITE_CONTINUOUS_QUERY_SERVER_BUFFER_SIZE", 5);
 
     /** */
+    private static final Object RETRY = new Object();
+
+    /** */
     protected final int part;
 
     /** */
@@ -75,32 +78,42 @@ public class CacheContinuousQueryEventBuffer {
     /**
      * @return Backup entries.
      */
-    @Nullable Collection<CacheContinuousQueryEntry> resetBackupQueue() {
-        Collection<CacheContinuousQueryEntry> ret;
+    @Nullable Collection<CacheContinuousQueryEntry> flushOnExchange() {
+        Collection<CacheContinuousQueryEntry> ret = null;
 
-        List<CacheContinuousQueryEntry> entries = null;
+        for (;;) {
+            Batch batch = curBatch.get();
 
-        Batch batch = curBatch.get();
+            if (batch != null) {
+                Collection<CacheContinuousQueryEntry> ret0 = batch.flushAndReset();
 
-        if (batch != null)
-            entries = batch.backupFlushEntries();
+                if (ret0 != null) {
+                    if (ret == null)
+                        ret = ret0;
+                    else
+                        ret.addAll(ret0);
+                }
+            }
 
-        if (!backupQ.isEmpty()) {
-            if (entries != null)
-                backupQ.addAll(entries);
+            if (!backupQ.isEmpty()) {
+                if (ret == null)
+                    ret = new ArrayList<>();
 
-            ret = this.backupQ;
+                CacheContinuousQueryEntry e;
 
-            backupQ = new ConcurrentLinkedDeque<>();
-        }
-        else
-            ret = entries;
+                while ((e = backupQ.pollFirst()) != null)
+                    ret.add(e);
+            }
 
-        if (!pending.isEmpty()) {
-            if (ret == null)
-                ret = new ArrayList<>();
+            if (!pending.isEmpty()) {
+                if (ret == null)
+                    ret = new ArrayList<>();
 
-            ret.addAll(pending.values());
+                ret.addAll(pending.values());
+            }
+
+            if (curBatch.compareAndSet(batch, null))
+                break;
         }
 
         return ret;
@@ -142,21 +155,30 @@ public class CacheContinuousQueryEventBuffer {
     private Object process0(long cntr, CacheContinuousQueryEntry entry, boolean backup) {
         assert cntr >= 0 : cntr;
 
-        Batch batch = initBatch(entry.topologyVersion());
+        Batch batch;
+        Object res = null;
 
-        if (batch == null || cntr < batch.startCntr) {
-            if (backup)
-                backupQ.add(entry);
+        for (;;) {
+            batch = initBatch(entry.topologyVersion());
 
-            return entry;
-        }
+            if (batch == null || cntr < batch.startCntr) {
+                if (backup)
+                    backupQ.add(entry);
 
-        Object res = null;
+                return entry;
+            }
+
+            if (cntr <= batch.endCntr) {
+                res = batch.processEntry0(null, cntr, entry, backup);
+
+                if (res == RETRY)
+                    continue;
+            }
+            else
+                pending.put(cntr, entry);
 
-        if (cntr <= batch.endCntr)
-            res = batch.processEvent0(null, cntr, entry, backup);
-        else
-            pending.put(cntr, entry);
+            break;
+        }
 
         Batch batch0 = curBatch.get();
 
@@ -166,7 +188,7 @@ public class CacheContinuousQueryEventBuffer {
 
                 res = processPending(res, batch, backup);
 
-                batch0 = curBatch.get();
+                batch0 = initBatch(entry.topologyVersion());
             }
             while (batch != batch0);
         }
@@ -184,17 +206,22 @@ public class CacheContinuousQueryEventBuffer {
         if (batch != null)
             return batch;
 
-        long curCntr = currentPartitionCounter();
+        for (;;) {
+            long curCntr = currentPartitionCounter();
 
-        if (curCntr == -1)
-            return null;
+            if (curCntr == -1)
+                return null;
 
-        batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer);
+            batch = new Batch(curCntr + 1, 0L, new CacheContinuousQueryEntry[BUF_SIZE], topVer);
 
-        if (curBatch.compareAndSet(null, batch))
-            return batch;
+            if (curBatch.compareAndSet(null, batch))
+                return batch;
+
+            batch = curBatch.get();
 
-        return curBatch.get();
+            if (batch != null)
+                return batch;
+        }
     }
 
     /**
@@ -211,7 +238,7 @@ public class CacheContinuousQueryEventBuffer {
                 assert cntr >= batch.startCntr && cntr <= batch.endCntr : cntr;
 
                 if (pending.remove(p.getKey()) != null)
-                    res = batch.processEvent0(res, p.getKey(), p.getValue(), backup);
+                    res = batch.processEntry0(res, p.getKey(), p.getValue(), backup);
             }
         }
 
@@ -235,7 +262,7 @@ public class CacheContinuousQueryEventBuffer {
         private int lastProc = -1;
 
         /** */
-        private final CacheContinuousQueryEntry[] entries;
+        private CacheContinuousQueryEntry[] entries;
 
         /** */
         private final AffinityTopologyVersion topVer;
@@ -261,7 +288,10 @@ public class CacheContinuousQueryEventBuffer {
         /**
          * @return Entries to send as part of backup queue.
          */
-        @Nullable synchronized List<CacheContinuousQueryEntry> backupFlushEntries()
{
+        @Nullable synchronized List<CacheContinuousQueryEntry> flushAndReset() {
+            if (entries == null)
+                return null;
+
             List<CacheContinuousQueryEntry> res = null;
 
             long filtered = this.filtered;
@@ -283,15 +313,7 @@ public class CacheContinuousQueryEventBuffer {
                     if (e.isFiltered())
                         filtered++;
                     else {
-                        flushEntry = new CacheContinuousQueryEntry(e.cacheId(),
-                            e.eventType(),
-                            e.key(),
-                            e.value(),
-                            e.oldValue(),
-                            e.isKeepBinary(),
-                            e.partition(),
-                            e.updateCounter(),
-                            e.topologyVersion());
+                        flushEntry = e;
 
                         flushEntry.filteredCount(filtered);
 
@@ -316,6 +338,8 @@ public class CacheContinuousQueryEventBuffer {
                 res.add(filteredEntry(cntr - 1, filtered - 1));
             }
 
+            entries = null;
+
             return res;
         }
 
@@ -350,7 +374,7 @@ public class CacheContinuousQueryEventBuffer {
          * @return New result.
          */
         @SuppressWarnings("unchecked")
-        @Nullable private Object processEvent0(
+        @Nullable private Object processEntry0(
             @Nullable Object res,
             long cntr,
             CacheContinuousQueryEntry entry,
@@ -358,6 +382,9 @@ public class CacheContinuousQueryEventBuffer {
             int pos = (int)(cntr - startCntr);
 
             synchronized (this) {
+                if (entries == null)
+                    return RETRY;
+
                 entries[pos] = entry;
 
                 int next = lastProc + 1;
@@ -409,17 +436,24 @@ public class CacheContinuousQueryEventBuffer {
                     }
 
                     lastProc = pos;
-                }
-                else
-                    return res;
-            }
 
-            if (pos == entries.length -1) {
-                Arrays.fill(entries, null);
+                    if (pos == entries.length - 1) {
+                        Arrays.fill(entries, null);
+
+                        Batch nextBatch = new Batch(this.startCntr + BUF_SIZE,
+                            filtered,
+                            entries,
+                            entry.topologyVersion());
+
+                        entries = null;
 
-                Batch nextBatch = new Batch(this.startCntr + BUF_SIZE, filtered, entries,
entry.topologyVersion());
+                        assert curBatch.get() == this;
 
-                curBatch.set(nextBatch);
+                        curBatch.set(nextBatch);
+                    }
+                }
+                else
+                    return res;
             }
 
             return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/01f45c1b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index b4f2dbd..ebfbe4f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -452,7 +452,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
                     for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE :
entryBufs.entrySet()) {
                         CacheContinuousQueryEventBuffer buf = bufE.getValue();
 
-                        Collection<CacheContinuousQueryEntry> backupQueue = buf.resetBackupQueue();
+                        Collection<CacheContinuousQueryEntry> backupQueue = buf.flushOnExchange();
 
                         if (backupQueue != null && node != null) {
                             for (CacheContinuousQueryEntry e : backupQueue) {
@@ -958,7 +958,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
         for (Map.Entry<Integer, CacheContinuousQueryEventBuffer> bufE : entryBufs.entrySet())
{
             CacheContinuousQueryEventBuffer buf = bufE.getValue();
 
-            buf.resetBackupQueue();
+            buf.flushOnExchange();
         }
     }
 


Mime
View raw message