ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: cc
Date Mon, 29 May 2017 06:03:43 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075-cc be43bf8fe -> ed2f8b3b7


cc


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ed2f8b3b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ed2f8b3b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ed2f8b3b

Branch: refs/heads/ignite-5075-cc
Commit: ed2f8b3b719c42f380b8b247b1b62d4aa1c73184
Parents: be43bf8
Author: sboikov <sboikov@gridgain.com>
Authored: Mon May 29 09:03:33 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon May 29 09:03:33 2017 +0300

----------------------------------------------------------------------
 .../CacheContinuousQueryEventBuffer.java        | 43 +++++++++++---------
 1 file changed, 23 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ed2f8b3b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
index 7d33614..74b3ff8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEventBuffer.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.IgniteSystemProperties;
@@ -78,37 +79,37 @@ public class CacheContinuousQueryEventBuffer {
      * @return Backup entries.
      */
     @Nullable Collection<CacheContinuousQueryEntry> flushOnExchange() {
-        Collection<CacheContinuousQueryEntry> ret = null;
-
-        Batch batch = curBatch.get();
-
-        if (batch != null)
-            ret = batch.flushCurrentEntries();
+        TreeMap<Long, CacheContinuousQueryEntry> ret = null;
 
         int size = backupQ.sizex();
 
         if (size > 0) {
-            if (ret == null)
-                ret = new ArrayList<>();
+            ret = new TreeMap<>();
 
             for (int i = 0; i < size; i++) {
                 CacheContinuousQueryEntry e = backupQ.pollFirst();
 
                 if (e != null)
-                    ret.add(e);
+                    ret.put(e.updateCounter(), e);
                 else
                     break;
             }
         }
 
+        Batch batch = curBatch.get();
+
+        if (batch != null)
+            ret = batch.flushCurrentEntries(ret);
+
         if (!pending.isEmpty()) {
             if (ret == null)
-                ret = new ArrayList<>();
+                ret = new TreeMap<>();
 
-            ret.addAll(pending.values());
+            for (CacheContinuousQueryEntry e : pending.values())
+                ret.put(e.updateCounter(), e);
         }
 
-        return ret;
+        return ret != null ? ret.values() : null;
     }
 
     /**
@@ -319,13 +320,13 @@ public class CacheContinuousQueryEventBuffer {
         }
 
         /**
+         * @param res Current entries.
          * @return Entries to send as part of backup queue.
          */
-        @Nullable synchronized List<CacheContinuousQueryEntry> flushCurrentEntries()
{
+        @Nullable synchronized TreeMap<Long, CacheContinuousQueryEntry> flushCurrentEntries(
+            @Nullable TreeMap<Long, CacheContinuousQueryEntry> res) {
             if (entries == null)
-                return null;
-
-            List<CacheContinuousQueryEntry> res = null;
+                return res;
 
             long filtered = this.filtered;
             long cntr = startCntr;
@@ -365,9 +366,9 @@ public class CacheContinuousQueryEventBuffer {
 
                 if (flushEntry != null) {
                     if (res == null)
-                        res = new ArrayList<>();
+                        res = new TreeMap<>();
 
-                    res.add(flushEntry);
+                    res.put(flushEntry.updateCounter(), flushEntry);
                 }
 
                 cntr++;
@@ -375,9 +376,11 @@ public class CacheContinuousQueryEventBuffer {
 
             if (filtered != 0L) {
                 if (res == null)
-                    res = new ArrayList<>();
+                    res = new TreeMap<>();
+
+                CacheContinuousQueryEntry flushEntry = filteredEntry(cntr - 1, filtered -
1);
 
-                res.add(filteredEntry(cntr - 1, filtered - 1));
+                res.put(flushEntry.updateCounter(), flushEntry);
             }
 
             return res;


Mime
View raw message