ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ntikho...@apache.org
Subject [48/50] [abbrv] ignite git commit: ignite-426-2-reb WIP
Date Wed, 28 Oct 2015 13:15:24 GMT
ignite-426-2-reb WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/926a0013
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/926a0013
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/926a0013

Branch: refs/heads/ignite-426-2-reb
Commit: 926a0013724e1394fd02c601ab57c53aa7f217f8
Parents: b59d02d
Author: Tikhonov Nikolay <tikhonovnicolay@gmail.com>
Authored: Sun Oct 25 17:10:10 2015 +0300
Committer: nikolay_tikhonov <ntikhonov@gridgain.com>
Committed: Wed Oct 28 15:26:45 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     | 69 ++++++++++++++++----
 .../cache/GridCacheUpdateAtomicResult.java      | 17 ++++-
 .../dht/atomic/GridDhtAtomicCache.java          | 24 ++++++-
 .../continuous/CacheContinuousQueryManager.java |  6 ++
 ...acheContinuousQueryFailoverAbstractTest.java |  8 +--
 5 files changed, 106 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/926a0013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index abed98d..e842f61 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -33,6 +33,7 @@ import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
 import org.apache.ignite.cache.CacheMemoryMode;
 import org.apache.ignite.cache.eviction.EvictableEntry;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo;
 import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -61,6 +62,7 @@ import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.T3;
@@ -1766,6 +1768,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
         CacheObject oldVal;
         CacheObject updated;
 
+        if (!primary) {
+            int z = 0;
+
+            ++z;
+        }
+
         GridCacheVersion enqueueVer = null;
 
         GridCacheVersionConflictContext<?, ?> conflictCtx = null;
@@ -1784,6 +1792,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
         Object updated0 = null;
 
         Long updateIdx0 = null;
+        CI1<IgniteInternalFuture<Void>> contQryNtf = null;
 
         synchronized (this) {
             boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM ||
!F.isEmptyOrNulls(filter);
@@ -1893,7 +1902,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                             null,
                             null,
                             false,
-                            updateIdx0 == null ? 0 : updateIdx0);
+                            updateIdx0 == null ? 0 : updateIdx0,
+                            null);
                     }
                     // Will update something.
                     else {
@@ -1970,8 +1980,23 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                             if (updateIdx != null)
                                 updateIdx0 = updateIdx;
 
-                            cctx.continuousQueries().onEntryUpdated(this, key, evtVal, prevVal,
primary, false,
-                                updateIdx0, topVer);
+                            final boolean primary0 = primary;
+                            final CacheObject prevVal0 = prevVal;
+                            final CacheObject evtVal0 = evtVal;
+                            final AffinityTopologyVersion topVer0 = topVer;
+                            final long updateIdx00 = updateIdx0;
+
+                            contQryNtf = new CI1<IgniteInternalFuture<Void>>()
{
+                                @Override public void apply(IgniteInternalFuture<Void>
voidIgniteInternalFuture) {
+                                    try {
+                                        cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this,
key, evtVal0,
+                                                prevVal0, primary0, false, updateIdx00, topVer0);
+                                    }
+                                    catch (IgniteCheckedException e) {
+                                        // No-op.
+                                    }
+                                }
+                            };
                         }
 
                         return new GridCacheUpdateAtomicResult(false,
@@ -1983,7 +2008,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                             null,
                             null,
                             false,
-                            updateIdx0 == null ? 0 : updateIdx0);
+                            updateIdx0 == null ? 0 : updateIdx0,
+                            contQryNtf);
                     }
                 }
                 else
@@ -2060,7 +2086,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                         null,
                         null,
                         false,
-                        updateIdx0 == null ? 0 : updateIdx0);
+                        updateIdx0 == null ? 0 : updateIdx0,
+                        null);
                 }
             }
 
@@ -2108,7 +2135,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                         null,
                         null,
                         false,
-                        updateIdx0 == null ? 0 : updateIdx);
+                        updateIdx0 == null ? 0 : updateIdx,
+                        null);
                 }
             }
             else
@@ -2209,7 +2237,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                             null,
                             null,
                             false,
-                            updateIdx0 == null ? 0 : updateIdx0);
+                            updateIdx0 == null ? 0 : updateIdx0,
+                            null);
                     else if (interceptorVal != updated0) {
                         updated0 = cctx.unwrapTemporary(interceptorVal);
 
@@ -2291,7 +2320,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
                             null,
                             null,
                             false,
-                            updateIdx0 == null ? 0 : updateIdx0);
+                            updateIdx0 == null ? 0 : updateIdx0,
+                            null);
                 }
 
                 if (writeThrough)
@@ -2377,8 +2407,24 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
             if (res)
                 updateMetrics(op, metrics);
 
-            if (!isNear())
-                cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, primary,
false, updateIdx0, topVer);
+            if (!isNear()) {
+                final boolean primary0 = primary;
+                final CacheObject oldVal0 = oldVal;
+                final AffinityTopologyVersion topVer0 = topVer;
+                final long updateIdx00 = updateIdx0;
+
+                contQryNtf = new CI1<IgniteInternalFuture<Void>>() {
+                    @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture)
{
+                        try {
+                            cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this,
key, val, oldVal0, primary0,
+                                false, updateIdx00, topVer0);
+                        }
+                        catch (IgniteCheckedException e) {
+                            // No-op.
+                        }
+                    }
+                };
+            }
 
             cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE);
 
@@ -2405,7 +2451,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
             enqueueVer,
             conflictCtx,
             true,
-            updateIdx0);
+            updateIdx0,
+            contQryNtf);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/926a0013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
index 092d990..9e2aca6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java
@@ -18,9 +18,12 @@
 package org.apache.ignite.internal.processors.cache;
 
 import javax.cache.processor.EntryProcessor;
+
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.Nullable;
@@ -63,6 +66,9 @@ public class GridCacheUpdateAtomicResult {
     /** Value computed by entry processor. */
     private IgniteBiTuple<Object, Exception> res;
 
+    /** Continuous query notify listener. */
+    private CI1<IgniteInternalFuture<Void>> contQryNtfy;
+
     /**
      * Constructor.
      *
@@ -86,7 +92,8 @@ public class GridCacheUpdateAtomicResult {
         @Nullable GridCacheVersion rmvVer,
         @Nullable GridCacheVersionConflictContext<?, ?> conflictRes,
         boolean sndToDht,
-        long updateIdx) {
+        long updateIdx,
+        @Nullable CI1<IgniteInternalFuture<Void>> contQryNtfy) {
         this.success = success;
         this.oldVal = oldVal;
         this.newVal = newVal;
@@ -97,6 +104,7 @@ public class GridCacheUpdateAtomicResult {
         this.conflictRes = conflictRes;
         this.sndToDht = sndToDht;
         this.updateIdx = updateIdx;
+        this.contQryNtfy = contQryNtfy;
     }
 
     /**
@@ -170,6 +178,13 @@ public class GridCacheUpdateAtomicResult {
         return sndToDht;
     }
 
+    /**
+     * @return Continuous notify closure.
+     */
+    public CI1<IgniteInternalFuture<Void>> contQryNtfy() {
+        return contQryNtfy;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheUpdateAtomicResult.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/926a0013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 46799d7..c6ab45d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1765,7 +1765,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                     filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id()));
                 }
 
-                GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
+                final GridCacheUpdateAtomicResult updRes = entry.innerUpdate(
                     ver,
                     node.id(),
                     locNodeId,
@@ -1799,6 +1799,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                     readersOnly = true;
                 }
 
+                if (!primary) {
+                    int z = 0;
+
+                    ++z;
+                }
+
+                if (updRes.contQryNtfy() != null) {
+                    if (primary && dhtFut != null) {
+                        dhtFut.listen(new CI1<IgniteInternalFuture<Void>>() {
+                            @Override public void apply(IgniteInternalFuture<Void>
f) {
+                                if (f.isDone() && f.error() == null)
+                                        updRes.contQryNtfy().apply(f);
+                                }
+                            });
+                    }
+                    else
+                        updRes.contQryNtfy().apply(null);
+                }
+
                 if (dhtFut != null) {
                     if (updRes.sendToDht()) { // Send to backups even in case of remove-remove
scenarios.
                         GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult();
@@ -2561,6 +2580,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                         if (updRes.removeVersion() != null)
                             ctx.onDeferredDelete(entry, updRes.removeVersion());
 
+                        if (updRes.contQryNtfy() != null)
+                            updRes.contQryNtfy().apply(null);
+
                         entry.onUnlock();
 
                         break; // While.

http://git-wip-us.apache.org/repos/asf/ignite/blob/926a0013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index 14fe195..ecc778b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -189,6 +189,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         if (preload && !internal)
             return;
 
+        if (!primary) {
+            int z = 0;
+
+            ++z;
+        }
+
         ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol;
 
         if (internal)

http://git-wip-us.apache.org/repos/asf/ignite/blob/926a0013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
index 6979f6a..90e21ad 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java
@@ -27,11 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -93,6 +89,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
 
 /**
  *
@@ -122,6 +119,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends
GridCommo
 
         TestCommunicationSpi commSpi = new TestCommunicationSpi();
 
+        commSpi.setSharedMemoryPort(-1);
         commSpi.setIdleConnectionTimeout(100);
 
         cfg.setCommunicationSpi(commSpi);


Mime
View raw message