ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: - fixed lock order for interrupt lock/topology write lock - GridCacheTtlManager: disable near cache eviction logic for local cache
Date Fri, 30 Dec 2016 10:11:38 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3477 d139e6476 -> c66bb4f8f


- fixed lock order for interrupt lock/topology write lock
- GridCacheTtlManager: disable near cache eviction logic for local cache


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c66bb4f8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c66bb4f8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c66bb4f8

Branch: refs/heads/ignite-3477
Commit: c66bb4f8f6a85607b262e863eb66489a56905b8c
Parents: d139e64
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Dec 30 13:11:34 2016 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Dec 30 13:11:34 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheTtlManager.java   |  10 +-
 .../dht/GridDhtPartitionTopologyImpl.java       | 126 +++++++++----------
 .../GridDhtPartitionsExchangeFuture.java        |  38 ++++--
 3 files changed, 91 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c66bb4f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
index a336a80..bc09066 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java
@@ -17,12 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.GridConcurrentSkipListSet;
@@ -31,8 +29,6 @@ import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.LongAdder8;
 
@@ -40,7 +36,6 @@ import org.jsr166.LongAdder8;
  * Eagerly removes expired entries from cache when
  * {@link CacheConfiguration#isEagerTtl()} flag is set.
  */
-@SuppressWarnings("NakedNotify")
 public class GridCacheTtlManager extends GridCacheManagerAdapter {
     /** Entries pending removal. */
     private  GridConcurrentSkipListSetEx pendingEntries;
@@ -87,7 +82,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
 
         cctx.shared().ttl().register(this);
 
-        pendingEntries = cctx.config().getNearConfiguration() != null ? new GridConcurrentSkipListSetEx()
: null;
+        pendingEntries = (!cctx.isLocal() && cctx.config().getNearConfiguration()
!= null) ? new GridConcurrentSkipListSetEx() : null;
     }
 
     /** {@inheritDoc} */
@@ -156,7 +151,6 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
     public boolean expire(int amount) {
         long now = U.currentTimeMillis();
 
-
         try {
             if (pendingEntries != null) {
                 GridNearCacheAdapter nearCache = cctx.near();
@@ -186,7 +180,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter {
             boolean more = cctx.offheap().expire(expireC, amount);
 
             if (more)
-                return more;
+                return true;
 
             if (amount != -1 && pendingEntries != null) {
                 EntryWrapper e = pendingEntries.firstx();

http://git-wip-us.apache.org/repos/asf/ignite/blob/c66bb4f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f22c263..0dd836d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -70,7 +70,8 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 /**
  * Partition topology.
  */
-@GridToStringExclude class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
+@GridToStringExclude
+class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
     /** If true, then check consistency. */
     private static final boolean CONSISTENCY_CHECK = false;
 
@@ -123,7 +124,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
     private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
 
     /** */
-    private volatile boolean treatAllPartitionAsLocal;
+    private volatile boolean treatAllPartAsLoc;
 
     /**
      * @param cctx Context.
@@ -490,7 +491,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
         ClusterState newState = exchFut.newClusterState();
 
-        treatAllPartitionAsLocal = (newState != null && newState == ClusterState.ACTIVE)
+        treatAllPartAsLoc = (newState != null && newState == ClusterState.ACTIVE)
             || (cctx.kernalContext().state().active()
             && discoEvt.type() == EventType.EVT_NODE_JOINED
             && discoEvt.eventNode().isLocal()
@@ -504,64 +505,64 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
         cctx.shared().database().checkpointReadLock();
 
-        try {
-            U.writeLock(lock);
-        }
-        catch (IgniteInterruptedCheckedException e) {
-            cctx.shared().database().checkpointReadUnlock();
+        synchronized (cctx.shared().exchange().interruptLock()) {
+            if (Thread.currentThread().isInterrupted())
+                throw new IgniteCheckedException("Thread is interrupted: " + Thread.currentThread());
 
-            throw e;
-        }
+            try {
+                U.writeLock(lock);
+            }
+            catch (IgniteInterruptedCheckedException e) {
+                cctx.shared().database().checkpointReadUnlock();
 
-        try {
-            GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
+                throw e;
+            }
 
-            if (stopping)
-                return;
+            try {
+                GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
 
-            assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer="
+
-                topVer + ", exchId=" + exchId + ']';
+                if (stopping)
+                    return;
 
-            if (exchId.isLeft())
-                removeNode(exchId.nodeId());
+                assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version
[topVer=" +
+                    topVer + ", exchId=" + exchId + ']';
 
-            ClusterNode oldest = currentCoordinator();
+                if (exchId.isLeft())
+                    removeNode(exchId.nodeId());
 
-            if (log.isDebugEnabled())
-                log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap="
+ fullMapString() + ']');
+                ClusterNode oldest = currentCoordinator();
 
-            long updateSeq = this.updateSeq.incrementAndGet();
+                if (log.isDebugEnabled())
+                    log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap="
+ fullMapString() + ']');
 
-            cntrMap.clear();
+                long updateSeq = this.updateSeq.incrementAndGet();
 
-            // If this is the oldest node.
-            if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(),
exchId.topologyVersion()))) {
-                if (node2part == null) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(),
updateSeq);
+                cntrMap.clear();
 
-                    if (log.isDebugEnabled())
-                        log.debug("Created brand new full topology map on oldest node [exchId="
+
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
-                else if (!node2part.valid()) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(),
updateSeq, node2part, false);
+                // If this is the oldest node.
+                if (oldest != null && (loc.equals(oldest) || exchFut.isCacheAdded(cctx.cacheId(),
exchId.topologyVersion()))) {
+                    if (node2part == null) {
+                        node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(),
updateSeq);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Created new full topology map on oldest node [exchId="
+ exchId + ", fullMap=" +
-                            node2part + ']');
-                }
-                else if (!node2part.nodeId().equals(loc.id())) {
-                    node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(),
updateSeq, node2part, false);
+                        if (log.isDebugEnabled())
+                            log.debug("Created brand new full topology map on oldest node
[exchId=" +
+                                exchId + ", fullMap=" + fullMapString() + ']');
+                    }
+                    else if (!node2part.valid()) {
+                        node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(),
updateSeq, node2part, false);
 
-                    if (log.isDebugEnabled())
-                        log.debug("Copied old map into new map on oldest node (previous oldest
node left) [exchId=" +
-                            exchId + ", fullMap=" + fullMapString() + ']');
-                }
-            }
+                        if (log.isDebugEnabled())
+                            log.debug("Created new full topology map on oldest node [exchId="
+ exchId + ", fullMap=" +
+                                node2part + ']');
+                    }
+                    else if (!node2part.nodeId().equals(loc.id())) {
+                        node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(),
updateSeq, node2part, false);
 
-            synchronized (cctx.shared().exchange().interruptLock()) {
-                if (Thread.currentThread().isInterrupted())
-                    throw new IgniteCheckedException("Thread is interrupted: " + Thread.currentThread());
+                        if (log.isDebugEnabled())
+                            log.debug("Copied old map into new map on oldest node (previous
oldest node left) [exchId=" +
+                                exchId + ", fullMap=" + fullMapString() + ']');
+                    }
+                }
 
                 if (affReady)
                     initPartitions0(exchFut, updateSeq);
@@ -570,18 +571,18 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
                     createPartitions(aff, updateSeq);
                 }
-            }
 
-            consistencyCheck();
+                consistencyCheck();
 
-            if (log.isDebugEnabled())
-                log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap="
+
-                    fullMapString() + ']');
-        }
-        finally {
-            lock.writeLock().unlock();
+                if (log.isDebugEnabled())
+                    log.debug("Partition map after beforeExchange [exchId=" + exchId + ",
fullMap=" +
+                        fullMapString() + ']');
+            }
+            finally {
+                lock.writeLock().unlock();
 
-            cctx.shared().database().checkpointReadUnlock();
+                cctx.shared().database().checkpointReadUnlock();
+            }
         }
 
         // Wait for evictions.
@@ -590,7 +591,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
 
     /** {@inheritDoc} */
     @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws
IgniteCheckedException {
-        treatAllPartitionAsLocal = false;
+        treatAllPartAsLoc = false;
 
         boolean changed = waitForRent();
 
@@ -767,17 +768,16 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             if (loc != null && state == EVICTED) {
                 locParts.set(p, loc = null);
 
-                if (!treatAllPartitionAsLocal && !belongs)
+                if (!treatAllPartAsLoc && !belongs)
                     throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted
partition " +
                         "(often may be caused by inconsistent 'key.hashCode()' implementation)
" +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer
+ ']');
             }
-            else if (loc != null && state == RENTING && cctx.allowFastEviction())
{
+            else if (loc != null && state == RENTING && cctx.allowFastEviction())
                 throw new GridDhtInvalidPartitionException(p, "Adding entry to partition
that is concurrently evicted.");
-            }
 
             if (loc == null) {
-                if (!treatAllPartitionAsLocal && !belongs)
+                if (!treatAllPartAsLoc && !belongs)
                     throw new GridDhtInvalidPartitionException(p, "Creating partition which
does not belong to " +
                         "local node (often may be caused by inconsistent 'key.hashCode()'
implementation) " +
                         "[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer
+ ']');
@@ -989,7 +989,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
             List<ClusterNode> nodes = new ArrayList<>(size);
 
             for (UUID id : nodeIds) {
-                if (topVer.topologyVersion() > 0 && !allIds.contains(id))
+                if (topVer.topologyVersion() > 0 && !F.contains(allIds, id))
                     continue;
 
                 if (hasState(p, id, state, states)) {
@@ -1905,7 +1905,7 @@ import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDh
                     continue;
 
                 if (cntr0 == null || cntr1 > cntr0.get1())
-                    res.put(part.id(), new T2<Long, Long>(cntr1, part.updateCounter()));
+                    res.put(part.id(), new T2<>(cntr1, part.updateCounter()));
             }
 
             return res;

http://git-wip-us.apache.org/repos/asf/ignite/blob/c66bb4f8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index eb98fa6..53c2dbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1491,18 +1491,6 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
         }
     }
 
-    private static class CounterWithNodes {
-        private final long cnt;
-
-        private final Set<UUID> nodes = new HashSet<>();
-
-        private CounterWithNodes(long cnt, UUID firstNode) {
-            this.cnt = cnt;
-
-            nodes.add(firstNode);
-        }
-    }
-
     /**
      * Detect lost partitions.
      */
@@ -2048,4 +2036,30 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT
             "srvNodes", srvNodes,
             "super", super.toString());
     }
+
+    /**
+     *
+     */
+    private static class CounterWithNodes {
+        /** */
+        private final long cnt;
+
+        /** */
+        private final Set<UUID> nodes = new HashSet<>();
+
+        /**
+         * @param cnt Count.
+         * @param firstNode Node ID.
+         */
+        private CounterWithNodes(long cnt, UUID firstNode) {
+            this.cnt = cnt;
+
+            nodes.add(firstNode);
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(CounterWithNodes.class, this);
+        }
+    }
 }


Mime
View raw message