ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-5075
Date Tue, 16 May 2017 09:42:12 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-5075 f04db168a -> bd0171579


ignite-5075


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/bd017157
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/bd017157
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/bd017157

Branch: refs/heads/ignite-5075
Commit: bd017157997163a462218232608d70c3a437d96c
Parents: f04db16
Author: sboikov <sboikov@gridgain.com>
Authored: Tue May 16 12:42:06 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue May 16 12:42:06 2017 +0300

----------------------------------------------------------------------
 .../discovery/GridDiscoveryManager.java         |   2 +-
 .../cache/CacheGroupInfrastructure.java         | 170 ++++++++++++++++---
 .../processors/cache/GridCacheContext.java      |  41 ++++-
 .../processors/cache/GridCacheEventManager.java |  36 ----
 .../GridCachePartitionExchangeManager.java      |  22 +--
 .../cache/GridCachePreloaderAdapter.java        |   3 +-
 .../dht/GridClientPartitionTopology.java        |   5 +-
 .../distributed/dht/GridDhtLocalPartition.java  |  32 ++--
 .../dht/GridDhtPartitionTopologyImpl.java       |  28 +--
 .../dht/preloader/GridDhtPartitionDemander.java |  69 ++------
 .../dht/preloader/GridDhtPreloader.java         |  25 ++-
 .../query/h2/database/H2PkHashIndex.java        |   1 -
 12 files changed, 257 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index bf48cf3..fb6637a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -2692,7 +2692,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi>
{
     }
 
     /**
-     * TODO IGNTIE-5075: also store list of started caches.
+     *
      */
     private static class CacheGroupAffinity {
         /** */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
index 816993b..ed4ba46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java
@@ -22,8 +22,10 @@ import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataPageEvictionMode;
+import org.apache.ignite.events.CacheRebalancingEvent;
 import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
@@ -40,13 +42,16 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.typedef.CI1;
+import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheMode.LOCAL;
 import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
 
 /**
@@ -206,31 +211,157 @@ public class CacheGroupInfrastructure {
      * @param cctx Cache context.
      */
     private void addCacheContext(GridCacheContext cctx) {
-        assert sharedGroup() || caches.isEmpty();
+        synchronized (caches) {
+            assert sharedGroup() || caches.isEmpty();
 
-        boolean add = caches.add(cctx);
+            boolean add = caches.add(cctx);
 
-        assert add : cctx.name();
+            assert add : cctx.name();
+        }
     }
 
     /**
      * @param cctx Cache context.
      */
     private void removeCacheContext(GridCacheContext cctx) {
-        assert sharedGroup() || caches.size() == 1 : caches.size();
+        synchronized (caches) {
+            assert sharedGroup() || caches.size() == 1 : caches.size();
 
-        boolean rmv = caches.remove(cctx);
+            boolean rmv = caches.remove(cctx);
 
-        assert rmv : cctx.name();
+            assert rmv : cctx.name();
+        }
     }
 
     /**
      * @return Cache context if group contains single cache.
      */
     public GridCacheContext singleCacheContext() {
-        assert !sharedGroup() && caches.size() == 1;
+        synchronized (caches) {
+            assert !sharedGroup() && caches.size() == 1;
+
+            return caches.get(0);
+        }
+    }
+
+    /**
+     *
+     */
+    public void unwindUndeploys() {
+        synchronized (caches) {
+            for (int i = 0; i < caches.size(); i++) {
+                GridCacheContext cctx = caches.get(i);
 
-        return caches.get(0);
+                cctx.deploy().unwind(cctx);
+            }
+        }
+    }
+
+    /**
+     * @param type Event type to check.
+     * @return {@code True} if given event type should be recorded.
+     */
+    public boolean eventRecordable(int type) {
+        return ctx.gridEvents().isRecordable(type);
+    }
+
+    /**
+     * Adds preloading event.
+     *
+     * @param part Partition.
+     * @param type Event type.
+     * @param discoNode Discovery node.
+     * @param discoType Discovery event type.
+     * @param discoTs Discovery event timestamp.
+     */
+    public void addRebalanceEvent(int part, int type, ClusterNode discoNode, int discoType,
long discoTs) {
+        assert discoNode != null;
+        assert type > 0;
+        assert discoType > 0;
+        assert discoTs > 0;
+
+        if (!eventRecordable(type))
+            LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type));
+
+        synchronized (caches) {
+            for (int i = 0; i < caches.size(); i++) {
+                GridCacheContext cctx = caches.get(i);
+
+                if (cctx.recordEvent(type)) {
+                    cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                        cctx.localNode(),
+                        "Cache rebalancing event.",
+                        type,
+                        part,
+                        discoNode,
+                        discoType,
+                        discoTs));
+                }
+            }
+        }
+    }
+    /**
+     * Adds partition unload event.
+     *
+     * @param part Partition.
+     */
+    public void addUnloadEvent(int part) {
+        if (!eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
+            LT.warn(log, "Added event without checking if event is recordable: " +
+                U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED));
+
+        synchronized (caches) {
+            for (int i = 0; i < caches.size(); i++) {
+                GridCacheContext cctx = caches.get(i);
+
+                cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(),
+                    cctx.localNode(),
+                    "Cache unloading event.",
+                    EVT_CACHE_REBALANCE_PART_UNLOADED,
+                    part,
+                    null,
+                    0,
+                    0));
+            }
+        }
+    }
+
+    public void addCacheEvent(
+        int part,
+        KeyCacheObject key,
+        UUID evtNodeId,
+        @Nullable IgniteUuid xid,
+        @Nullable Object lockId,
+        int type,
+        @Nullable CacheObject newVal,
+        boolean hasNewVal,
+        @Nullable CacheObject oldVal,
+        boolean hasOldVal,
+        UUID subjId,
+        @Nullable String cloClsName,
+        @Nullable String taskName,
+        boolean keepBinary
+    ) {
+        synchronized (caches) {
+            for (int i = 0; i < caches.size(); i++) {
+                GridCacheContext cctx = caches.get(i);
+
+                cctx.events().addEvent(part,
+                    key,
+                    evtNodeId,
+                    xid,
+                    lockId,
+                    type,
+                    newVal,
+                    hasNewVal,
+                    oldVal,
+                    hasOldVal,
+                    subjId,
+                    cloClsName,
+                    taskName,
+                    keepBinary);
+            }
+        }
     }
 
     // TODO IGNITE-5075: need separate caches with/without queries?
@@ -238,10 +369,6 @@ public class CacheGroupInfrastructure {
         return QueryUtils.isEnabled(ccfg);
     }
 
-    public boolean started() {
-        return true; // TODO IGNITE-5075.
-    }
-
     /**
      * @return Free List.
      */
@@ -257,7 +384,6 @@ public class CacheGroupInfrastructure {
     }
 
     /**
-     * TODO IGNITE-5075: get rid of CacheObjectContext?
      * @return Cache object context.
      */
     public CacheObjectContext cacheObjectContext() {
@@ -418,22 +544,26 @@ public class CacheGroupInfrastructure {
      * @return {@code True} if group contains caches.
      */
     boolean hasCaches() {
-        return !caches.isEmpty();
+        synchronized (caches) {
+            return !caches.isEmpty();
+        }
     }
 
     /**
      * @param part Partition ID.
      */
     public void onPartitionEvicted(int part) {
-        for (int i = 0; i < caches.size(); i++) {
-            GridCacheContext cctx = caches.get(i);
+        synchronized (caches) {
+            for (int i = 0; i < caches.size(); i++) {
+                GridCacheContext cctx = caches.get(i);
 
-            if (cctx.isDrEnabled())
-                cctx.dr().partitionEvicted(part);
+                if (cctx.isDrEnabled())
+                    cctx.dr().partitionEvicted(part);
 
-            cctx.continuousQueries().onPartitionEvicted(part);
+                cctx.continuousQueries().onPartitionEvicted(part);
 
-            cctx.dataStructures().onPartitionEvicted(part);
+                cctx.dataStructures().onPartitionEvicted(part);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 58a3775..47ff283 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -45,6 +45,7 @@ import org.apache.ignite.cache.affinity.AffinityKeyMapper;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.EventType;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
@@ -56,8 +57,6 @@ import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
 import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.database.MemoryPolicy;
-import org.apache.ignite.internal.processors.cache.database.freelist.FreeList;
-import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
@@ -106,9 +105,10 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
-import static org.apache.ignite.cache.CacheRebalanceMode.NONE;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
+import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
 
 /**
@@ -249,6 +249,12 @@ public class GridCacheContext<K, V> implements Externalizable {
     /** */
     private boolean customAffMapper;
 
+    /** Whether {@link EventType#EVT_CACHE_REBALANCE_STARTED} was sent (used only for REPLICATED
cache). */
+    private volatile boolean rebalanceStartedEvtSent;
+
+    /** Whether {@link EventType#EVT_CACHE_REBALANCE_STOPPED} was sent (used only for REPLICATED
cache). */
+    private volatile boolean rebalanceStoppedEvtSent;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -2066,6 +2072,35 @@ public class GridCacheContext<K, V> implements Externalizable
{
             || (top.partitionState(localNodeId(), part) == OWNING);
     }
 
+    /**
+     * @param type Event type.
+     * @return {@code True} if event should be recorded.
+     */
+    public boolean recordEvent(int type) {
+        if (isReplicated()) {
+            if (type == EVT_CACHE_REBALANCE_STARTED) {
+                if (!rebalanceStartedEvtSent) {
+                    rebalanceStartedEvtSent = true;
+
+                    return true;
+                }
+                else
+                    return false;
+            }
+            else if (type == EVT_CACHE_REBALANCE_STOPPED) {
+                if (!rebalanceStoppedEvtSent) {
+                    rebalanceStoppedEvtSent = true;
+
+                    return true;
+                }
+                else
+                    return false;
+            }
+        }
+
+        return true;
+    }
+
     /** {@inheritDoc} */
     @Override public void writeExternal(ObjectOutput out) throws IOException {
         U.writeString(out, igniteInstanceName());

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 687b132..7a5dea6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -368,42 +368,6 @@ public class GridCacheEventManager extends GridCacheManagerAdapter {
     }
 
     /**
-     * Adds preloading event.
-     *
-     * @param part Partition.
-     * @param type Event type.
-     * @param discoNode Discovery node.
-     * @param discoType Discovery event type.
-     * @param discoTs Discovery event timestamp.
-     */
-    public void addPreloadEvent(int part, int type, ClusterNode discoNode, int discoType,
long discoTs) {
-        assert discoNode != null;
-        assert type > 0;
-        assert discoType > 0;
-        assert discoTs > 0;
-
-        if (!cctx.events().isRecordable(type))
-            LT.warn(log, "Added event without checking if event is recordable: " + U.gridEventName(type));
-
-        cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), cctx.localNode(),
-            "Cache rebalancing event.", type, part, discoNode, discoType, discoTs));
-    }
-
-    /**
-     * Adds partition unload event.
-     *
-     * @param part Partition.
-     */
-    public void addUnloadEvent(int part) {
-        if (!cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
-            LT.warn(log, "Added event without checking if event is recordable: " +
-                U.gridEventName(EVT_CACHE_REBALANCE_PART_UNLOADED));
-
-        cctx.gridEvents().record(new CacheRebalancingEvent(cctx.name(), cctx.localNode(),
-            "Cache unloading event.", EVT_CACHE_REBALANCE_PART_UNLOADED, part, null, 0, 0));
-    }
-
-    /**
      * @param type Event type.
      * @return {@code True} if event is recordable.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index a666297..7eb6824 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -899,31 +899,23 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana
 
         for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) {
             if (!grp.isLocal()) {
-                boolean ready;
+                GridAffinityAssignmentCache affCache = grp.affinity();
 
-                if (exchId != null) {
-                    AffinityTopologyVersion startTopVer = grp.localStartVersion();
-
-                    ready = startTopVer.compareTo(exchId.topologyVersion()) <= 0;
-                }
-                else
-                    ready = grp.started();
+                GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true);
 
-                if (ready) {
-                    GridAffinityAssignmentCache affCache = grp.affinity();
-
-                    GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true);
+                assert locMap != null || exchId == null : grp.nameForLog();
 
+                if (locMap != null) {
                     addFullPartitionsMap(m,
                         dupData,
                         compress,
                         grp.groupId(),
                         locMap,
                         affCache.similarAffinityKey());
-
-                    if (exchId != null)
-                        m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
                 }
+
+                if (exchId != null)
+                    m.addPartitionUpdateCounters(grp.groupId(), grp.topology().updateCounters(true));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index d005aae..9ca4852 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -122,8 +122,7 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
 
     /** {@inheritDoc} */
     @Override public void unwindUndeploys() {
-        // TODO IGNITE-5075.
-        // cctx.deploy().unwind(cctx);
+        grp.unwindUndeploys();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 7c2248c..e94415c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -542,7 +542,10 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology
{
         lock.readLock().lock();
 
         try {
-            assert node2part != null && node2part.valid() : "Invalid node2part [node2part:
" + node2part +
+            if (stopping || node2part == null)
+                return null;
+
+            assert node2part.valid() : "Invalid node2part [node2part: " + node2part +
                 ", locNodeId=" + cctx.localNodeId() +
                 ", igniteInstanceName=" + cctx.igniteInstanceName() + ']';
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 03959c7..ac8316b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -951,8 +951,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
     public void clearAll() throws NodeStoppingException {
         GridCacheVersion clearVer = ctx.versions().next();
 
-        // TODO IGNITE-5075.
-        boolean rec = grp.shared().gridEvents().isRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
+        boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
 
         Collection<ConcurrentMap<KeyCacheObject, GridCacheMapEntry>> maps =
             grp.sharedGroup() ? cachesEntryMaps.values() : Collections.singleton(singleCacheEntryMap);
@@ -975,21 +974,20 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl
implements
 
                         if (!cached.isInternal()) {
                             if (rec) {
-                                // TODO IGNITE-5075.
-//                            cctx.events().addEvent(cached.partition(),
-//                                cached.key(),
-//                                ctx.localNodeId(),
-//                                (IgniteUuid)null,
-//                                null,
-//                                EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
-//                                null,
-//                                false,
-//                                cached.rawGet(),
-//                                cached.hasValue(),
-//                                null,
-//                                null,
-//                                null,
-//                                false);
+                                grp.addCacheEvent(cached.partition(),
+                                    cached.key(),
+                                    ctx.localNodeId(),
+                                    null,
+                                    null,
+                                    EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+                                    null,
+                                    false,
+                                    cached.rawGet(),
+                                    cached.hasValue(),
+                                    null,
+                                    null,
+                                    null,
+                                    false);
                             }
                         }
                     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index 8bb9ba0..4696d27 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -661,14 +661,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
 
                                 changed = true;
 
-// TODO IGNITE-5075.
-//                                if (ctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST))
{
-//                                    DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-//
-//                                    cctx.events().addPreloadEvent(p,
-//                                        EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-//                                        discoEvt.type(), discoEvt.timestamp());
-//                                }
+                                if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST))
{
+                                    DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+                                    grp.addRebalanceEvent(p,
+                                        EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                                        discoEvt.eventNode(),
+                                        discoEvt.type(),
+                                        discoEvt.timestamp());
+                                }
 
                                 if (log.isDebugEnabled())
                                     log.debug("Owned partition: " + locPart);
@@ -1471,10 +1472,13 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology
{
                         }
                     }
 
-// TODO: IGNITE-5075.
-//                    if (cctx.events().isRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
-//                        cctx.events().addPreloadEvent(part, EVT_CACHE_REBALANCE_PART_DATA_LOST,
-//                            discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
+                    if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST))
{
+                        grp.addRebalanceEvent(part,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                            discoEvt.eventNode(),
+                            discoEvt.type(),
+                            discoEvt.timestamp());
+                    }
                 }
 
                 if (plc != PartitionLossPolicy.IGNORE)

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 1a9eb68..fbe0aaa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -110,18 +110,6 @@ public class GridDhtPartitionDemander {
     private final Map<Integer, Object> rebalanceTopics;
 
     /**
-     * Started event sent.
-     * Make sense for replicated cache only.
-     */
-    private final AtomicBoolean startedEvtSent = new AtomicBoolean();
-
-    /**
-     * Stopped event sent.
-     * Make sense for replicated cache only.
-     */
-    private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();
-
-    /**
      * @param grp Ccahe group.
      */
     public GridDhtPartitionDemander(CacheGroupInfrastructure grp) {
@@ -254,11 +242,10 @@ public class GridDhtPartitionDemander {
      * @param type Type.
      * @param discoEvt Discovery event.
      */
-    private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+    private void rebalanceEvent(int part, int type, DiscoveryEvent discoEvt) {
         assert discoEvt != null;
 
-        // TODO IGNITE-5075.
-        // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(),
discoEvt.timestamp());
+        grp.addRebalanceEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
     }
 
     /**
@@ -293,7 +280,7 @@ public class GridDhtPartitionDemander {
         if (delay == 0 || force) {
             final RebalanceFuture oldFut = rebalanceFut;
 
-            final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, startedEvtSent,
stoppedEvtSent, cnt);
+            final RebalanceFuture fut = new RebalanceFuture(grp, assigns, log, cnt);
 
             if (!oldFut.isInitial())
                 oldFut.cancel();
@@ -823,20 +810,11 @@ public class GridDhtPartitionDemander {
      */
     public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
         /** */
-        private static final long serialVersionUID = 1L;
-
-        /** */
         private final GridCacheSharedContext<?, ?> ctx;
 
         /** */
         private final CacheGroupInfrastructure grp;
 
-        /** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */
-        private final AtomicBoolean startedEvtSent;
-
-        /** Should EVT_CACHE_REBALANCE_STOPPED event be sent or not. */
-        private final AtomicBoolean stoppedEvtSent;
-
         /** */
         private final IgniteLogger log;
 
@@ -860,16 +838,12 @@ public class GridDhtPartitionDemander {
          * @param assigns Assigns.
          * @param grp Cache group.
          * @param log Logger.
-         * @param startedEvtSent Start event sent flag.
-         * @param stoppedEvtSent Stop event sent flag.
          * @param updateSeq Update sequence.
          */
         RebalanceFuture(
             CacheGroupInfrastructure grp,
             GridDhtPreloaderAssignments assigns,
             IgniteLogger log,
-            AtomicBoolean startedEvtSent,
-            AtomicBoolean stoppedEvtSent,
             long updateSeq) {
             assert assigns != null;
 
@@ -877,8 +851,6 @@ public class GridDhtPartitionDemander {
             this.topVer = assigns.topologyVersion();
             this.grp = grp;
             this.log = log;
-            this.startedEvtSent = startedEvtSent;
-            this.stoppedEvtSent = stoppedEvtSent;
             this.updateSeq = updateSeq;
 
             ctx= grp.shared();
@@ -893,8 +865,6 @@ public class GridDhtPartitionDemander {
             this.ctx = null;
             this.grp = null;
             this.log = null;
-            this.startedEvtSent = null;
-            this.stoppedEvtSent = null;
             this.updateSeq = -1;
         }
 
@@ -1023,10 +993,8 @@ public class GridDhtPartitionDemander {
                 if (isDone())
                     return;
 
-                // TODO IGNITE-5075.
-//                if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
-//                    preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
-//                        exchFut.discoveryEvent());
+                if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
+                    rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchFut.discoveryEvent());
 
                 T2<Long, Collection<Integer>> t = remaining.get(nodeId);
 
@@ -1057,19 +1025,18 @@ public class GridDhtPartitionDemander {
          * @param type Type.
          * @param discoEvt Discovery event.
          */
-        private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
+        private void rebalanceEvent(int part, int type, DiscoveryEvent discoEvt) {
             assert discoEvt != null;
 
-            // TODO IGNITE-5075.
-            // cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(),
discoEvt.timestamp());
+            grp.addRebalanceEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
         }
 
         /**
          * @param type Type.
          * @param discoEvt Discovery event.
          */
-        private void preloadEvent(int type, DiscoveryEvent discoEvt) {
-            preloadEvent(-1, type, discoEvt);
+        private void rebalanceEvent(int type, DiscoveryEvent discoEvt) {
+            rebalanceEvent(-1, type, discoEvt);
         }
 
         /**
@@ -1119,26 +1086,16 @@ public class GridDhtPartitionDemander {
          *
          */
         private void sendRebalanceStartedEvent() {
-            // TODO IGNITE-5075.
-//            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
-//                (!cctx.isReplicated() || !startedEvtSent.get())) {
-//                preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
-//
-//                startedEvtSent.set(true);
-//            }
+            if (grp.eventRecordable(EVT_CACHE_REBALANCE_STARTED))
+                rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
         }
 
         /**
          *
          */
         private void sendRebalanceFinishedEvent() {
-            // TODO IGNITE-5075.
-//            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
-//                (!cctx.isReplicated() || !stoppedEvtSent.get())) {
-//                preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
-//
-//                stoppedEvtSent.set(true);
-//            }
+            if (grp.eventRecordable(EVT_CACHE_REBALANCE_STOPPED))
+                rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
         }
 
         /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 8dfb4d2..afde2cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -300,14 +300,15 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                 if (picked.isEmpty()) {
                     top.own(part);
 
-// TODO IGNITE-5075.
-//                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST))
{
-//                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
-//
-//                        cctx.events().addPreloadEvent(p,
-//                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-//                            discoEvt.type(), discoEvt.timestamp());
-//                    }
+                    if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
+                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+                        grp.addRebalanceEvent(p,
+                            EVT_CACHE_REBALANCE_PART_DATA_LOST,
+                            discoEvt.eventNode(),
+                            discoEvt.type(),
+                            discoEvt.timestamp());
+                    }
 
                     if (log.isDebugEnabled())
                         log.debug("Owning partition as there are no other owners: " + part);
@@ -594,9 +595,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         try {
             top.onEvicted(part, updateSeq);
 
-// TODO IGNITE-5075.
-//            if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
-//                cctx.events().addUnloadEvent(part.id());
+            if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
+                grp.addUnloadEvent(part.id());
 
             if (updateSeq)
                 ctx.exchange().scheduleResendPartitions();
@@ -696,8 +696,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         demandLock.writeLock().lock();
 
         try {
-            // TODO IGNITE-5075.
-            // cctx.deploy().unwind(cctx);
+            grp.unwindUndeploys();
         }
         finally {
             demandLock.writeLock().unlock();

http://git-wip-us.apache.org/repos/asf/ignite/blob/bd017157/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
index 3011afa..9292b5b 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2PkHashIndex.java
@@ -126,7 +126,6 @@ public class H2PkHashIndex extends GridH2IndexBase {
     @Override public GridH2Row findOne(GridH2Row row) {
         try {
             for (IgniteCacheOffheapManager.CacheDataStore store : cctx.offheap().cacheDataStores())
{
-                // TODO IGNITE-5075.
                 CacheDataRow found = store.find(cctx, row.key);
 
                 if (found != null)


Mime
View raw message