ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [19/50] [abbrv] ignite git commit: IGNITE-3477 - Remove waitForRent() in exchange future
Date Wed, 03 May 2017 07:21:57 GMT
http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
index 9ef9c8d..cd731cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectMap;
@@ -62,11 +63,27 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /** Partitions update counters. */
     @GridToStringInclude
     @GridDirectTransient
-    private Map<Integer, Map<Integer, T2<Long, Long>>> partCntrs;
+    private IgniteDhtPartitionCountersMap partCntrs;
 
     /** Serialized partitions counters. */
     private byte[] partCntrsBytes;
 
+    /** Partitions history suppliers. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private IgniteDhtPartitionHistorySuppliersMap partHistSuppliers;
+
+    /** Serialized partitions history suppliers. */
+    private byte[] partHistSuppliersBytes;
+
+    /** Partitions that must be cleared and re-loaded. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private IgniteDhtPartitionsToReloadMap partsToReload;
+
+    /** Serialized partitions that must be cleared and re-loaded. */
+    private byte[] partsToReloadBytes;
+
     /** Topology version. */
     private AffinityTopologyVersion topVer;
 
@@ -96,12 +113,16 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      */
     public GridDhtPartitionsFullMessage(@Nullable GridDhtPartitionExchangeId id,
         @Nullable GridCacheVersion lastVer,
-        @NotNull AffinityTopologyVersion topVer) {
+        @NotNull AffinityTopologyVersion topVer,
+        @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers,
+        @Nullable IgniteDhtPartitionsToReloadMap partsToReload) {
         super(id, lastVer);
 
         assert id == null || topVer.equals(id.topologyVersion());
 
         this.topVer = topVer;
+        this.partHistSuppliers = partHistSuppliers;
+        this.partsToReload = partsToReload;
     }
 
     /**
@@ -159,10 +180,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      */
     public void addPartitionUpdateCounters(int cacheId, Map<Integer, T2<Long, Long>>
cntrMap) {
         if (partCntrs == null)
-            partCntrs = new HashMap<>();
+            partCntrs = new IgniteDhtPartitionCountersMap();
 
-        if (!partCntrs.containsKey(cacheId))
-            partCntrs.put(cacheId, cntrMap);
+        partCntrs.putIfAbsent(cacheId, cntrMap);
     }
 
     /**
@@ -171,9 +191,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
      */
     @Override public Map<Integer, T2<Long, Long>> partitionUpdateCounters(int
cacheId) {
         if (partCntrs != null) {
-            Map<Integer, T2<Long, Long>> res = partCntrs.get(cacheId);
-
-            return res != null ? res : Collections.<Integer, T2<Long, Long>>emptyMap();
+            return partCntrs.get(cacheId);
         }
 
         return Collections.emptyMap();
@@ -182,6 +200,23 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     /**
      *
      */
+    public IgniteDhtPartitionHistorySuppliersMap partitionHistorySuppliers() {
+        if (partHistSuppliers == null)
+            return IgniteDhtPartitionHistorySuppliersMap.empty();
+
+        return partHistSuppliers;
+    }
+
+    public Set<Integer> partsToReload(UUID nodeId, int cacheId) {
+        if (partsToReload == null)
+            return Collections.emptySet();
+
+        return partsToReload.get(nodeId, cacheId);
+    }
+
+    /**
+     *
+     */
     public Map<UUID, Exception> getExceptionsMap() {
         return exs;
     }
@@ -199,11 +234,15 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
         boolean marshal = (parts != null && partsBytes == null) ||
             (partCntrs != null && partCntrsBytes == null) ||
+            (partHistSuppliers != null && partHistSuppliersBytes == null) ||
+            (partsToReload != null && partsToReloadBytes == null) ||
             (exs != null && exsBytes == null);
 
         if (marshal) {
             byte[] partsBytes0 = null;
             byte[] partCntrsBytes0 = null;
+            byte[] partHistSuppliersBytes0 = null;
+            byte[] partsToReloadBytes0 = null;
             byte[] exsBytes0 = null;
 
             if (parts != null && partsBytes == null)
@@ -212,6 +251,12 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
             if (partCntrs != null && partCntrsBytes == null)
                 partCntrsBytes0 = U.marshal(ctx, partCntrs);
 
+            if (partHistSuppliers != null && partHistSuppliersBytes == null)
+                partHistSuppliersBytes0 = U.marshal(ctx, partHistSuppliers);
+
+            if (partsToReload != null && partsToReloadBytes == null)
+                partsToReloadBytes0 = U.marshal(ctx, partsToReload);
+
             if (exs != null && exsBytes == null)
                 exsBytes0 = U.marshal(ctx, exs);
 
@@ -221,10 +266,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 try {
                     byte[] partsBytesZip = U.zip(partsBytes0);
                     byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
+                    byte[] partHistSuppliersBytesZip = U.zip(partHistSuppliersBytes0);
+                    byte[] partsToReloadBytesZip = U.zip(partsToReloadBytes0);
                     byte[] exsBytesZip = U.zip(exsBytes0);
 
                     partsBytes0 = partsBytesZip;
                     partCntrsBytes0 = partCntrsBytesZip;
+                    partHistSuppliersBytes0 = partHistSuppliersBytesZip;
+                    partsToReloadBytes0 = partsToReloadBytesZip;
                     exsBytes0 = exsBytesZip;
 
                     compressed(true);
@@ -236,6 +285,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
 
             partsBytes = partsBytes0;
             partCntrsBytes = partCntrsBytes0;
+            partHistSuppliersBytes = partHistSuppliersBytes0;
+            partsToReloadBytes = partsToReloadBytes0;
             exsBytes = exsBytes0;
         }
     }
@@ -302,10 +353,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
 
+        if (partHistSuppliersBytes != null && partHistSuppliers == null) {
+            if (compressed())
+                partHistSuppliers = U.unmarshalZip(ctx.marshaller(), partHistSuppliersBytes,
U.resolveClassLoader(ldr, ctx.gridConfig()));
+            else
+                partHistSuppliers = U.unmarshal(ctx, partHistSuppliersBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+        }
+
+        if (partsToReloadBytes != null && partsToReload == null) {
+            if (compressed())
+                partsToReload = U.unmarshalZip(ctx.marshaller(), partsToReloadBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+            else
+                partsToReload = U.unmarshal(ctx, partsToReloadBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+        }
+
         if (partCntrs == null)
-            partCntrs = new HashMap<>();
+            partCntrs = new IgniteDhtPartitionCountersMap();
 
-        if (exsBytes != null && exs == null){
+        if (exsBytes != null && exs == null) {
             if (compressed())
                 exs = U.unmarshalZip(ctx.marshaller(), exsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
             else
@@ -350,12 +415,24 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeByteArray("partsBytes", partsBytes))
+                if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes))
                     return false;
 
                 writer.incrementState();
 
             case 10:
+                if (!writer.writeByteArray("partsBytes", partsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
+                if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 12:
                 if (!writer.writeMessage("topVer", topVer))
                     return false;
 
@@ -402,7 +479,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 9:
-                partsBytes = reader.readByteArray("partsBytes");
+                partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -410,6 +487,22 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
                 reader.incrementState();
 
             case 10:
+                partsBytes = reader.readByteArray("partsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
+                partsToReloadBytes = reader.readByteArray("partsToReloadBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 12:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -428,9 +521,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa
     }
 
     //todo
+
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 13;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
index 2f16c8c..8df7466 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java
@@ -65,6 +65,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     /** Serialized partitions counters. */
     private byte[] partCntrsBytes;
 
+    /** Partitions history reservation counters. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private Map<Integer, Map<Integer, Long>> partHistCntrs;
+
+    /** Serialized partitions history reservation counters. */
+    private byte[] partHistCntrsBytes;
+
     /** Exception. */
     @GridToStringInclude
     @GridDirectTransient
@@ -159,6 +167,43 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     }
 
     /**
+     * @param cacheId Cache ID.
+     * @param cntrMap Partition history counters.
+     */
+    public void partitionHistoryCounters(int cacheId, Map<Integer, Long> cntrMap) {
+        if (cntrMap.isEmpty())
+            return;
+
+        if (partHistCntrs == null)
+            partHistCntrs = new HashMap<>();
+
+        partHistCntrs.put(cacheId, cntrMap);
+    }
+
+    /**
+     * @param cntrMap Partition history counters.
+     */
+    public void partitionHistoryCounters(Map<Integer, Map<Integer, Long>> cntrMap)
{
+        for (Map.Entry<Integer, Map<Integer, Long>> e : cntrMap.entrySet()) {
+            partitionHistoryCounters(e.getKey(), e.getValue());
+        }
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return Partition history counters.
+     */
+    public Map<Integer, Long> partitionHistoryCounters(int cacheId) {
+        if (partHistCntrs != null) {
+            Map<Integer, Long> res = partHistCntrs.get(cacheId);
+
+            return res != null ? res : Collections.<Integer, Long>emptyMap();
+        }
+
+        return Collections.emptyMap();
+    }
+
+    /**
      * @return Local partitions.
      */
     public Map<Integer, GridDhtPartitionMap2> partitions() {
@@ -189,11 +234,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
         boolean marshal = (parts != null && partsBytes == null) ||
             (partCntrs != null && partCntrsBytes == null) ||
+            (partHistCntrs != null && partHistCntrsBytes == null) ||
             (ex != null && exBytes == null);
 
         if (marshal) {
             byte[] partsBytes0 = null;
             byte[] partCntrsBytes0 = null;
+            byte[] partHistCntrsBytes0 = null;
             byte[] exBytes0 = null;
 
             if (parts != null && partsBytes == null)
@@ -202,6 +249,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
             if (partCntrs != null && partCntrsBytes == null)
                 partCntrsBytes0 = U.marshal(ctx, partCntrs);
 
+            if (partHistCntrs != null && partHistCntrsBytes == null)
+                partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs);
+
             if (ex != null && exBytes == null)
                 exBytes0 = U.marshal(ctx, ex);
 
@@ -211,10 +261,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 try {
                     byte[] partsBytesZip = U.zip(partsBytes0);
                     byte[] partCntrsBytesZip = U.zip(partCntrsBytes0);
+                    byte[] partHistCntrsBytesZip = U.zip(partHistCntrsBytes0);
                     byte[] exBytesZip = U.zip(exBytes0);
 
                     partsBytes0 = partsBytesZip;
                     partCntrsBytes0 = partCntrsBytesZip;
+                    partHistCntrsBytes0 = partHistCntrsBytesZip;
                     exBytes0 = exBytesZip;
 
                     compressed(true);
@@ -226,6 +278,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
 
             partsBytes = partsBytes0;
             partCntrsBytes = partCntrsBytes0;
+            partHistCntrsBytes = partHistCntrsBytes0;
             exBytes = exBytes0;
         }
     }
@@ -248,6 +301,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 partCntrs = U.unmarshal(ctx, partCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig()));
         }
 
+        if (partHistCntrsBytes != null && partHistCntrs == null) {
+            if (compressed())
+                partHistCntrs = U.unmarshalZip(ctx.marshaller(), partHistCntrsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+            else
+                partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
+        }
+
         if (exBytes != null && ex == null) {
             if (compressed())
                 ex = U.unmarshalZip(ctx.marshaller(), exBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
@@ -316,6 +376,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 writer.incrementState();
 
             case 10:
+                if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes))
+                    return false;
+
+                writer.incrementState();
+
+            case 11:
                 if (!writer.writeByteArray("partsBytes", partsBytes))
                     return false;
 
@@ -370,6 +436,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
                 reader.incrementState();
 
             case 10:
+                partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 11:
                 partsBytes = reader.readByteArray("partsBytes");
 
                 if (!reader.isLastRead())
@@ -388,9 +462,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes
     }
 
     //todo add ex
+
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 11;
+        return 12;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 8ada4b3..55f6bcb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -75,6 +75,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
 import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING;
 import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap;
 
 /**
@@ -235,6 +236,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
         top = null;
     }
+
     /**
      * @return Node stop exception.
      */
@@ -270,8 +272,8 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         assert exchFut.forcePreload() || exchFut.dummyReassign() ||
             exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) :
             "Topology version mismatch [exchId=" + exchFut.exchangeId() +
-            ", cache=" + cctx.name() +
-            ", topVer=" + top.topologyVersion() + ']';
+                ", cache=" + cctx.name() +
+                ", topVer=" + top.topologyVersion() + ']';
 
         GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion());
 
@@ -290,47 +292,95 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
 
             // If partition belongs to local node.
             if (cctx.affinity().partitionLocalNode(p, topVer)) {
-                GridDhtLocalPartition part = top.localPartition(p, topVer, true);
+                GridDhtLocalPartition part = top.localPartition(p, topVer, true, true);
 
                 assert part != null;
                 assert part.id() == p;
 
-                if (part.state() != MOVING) {
-                    if (log.isDebugEnabled())
-                        log.debug("Skipping partition assignment (state is not MOVING): "
+ part);
-
-                    continue; // For.
-                }
+                ClusterNode histSupplier = null;
 
-                Collection<ClusterNode> picked = pickedOwners(p, topVer);
+                if (cctx.shared().database().persistenceEnabled()) {
+                    UUID nodeId = exchFut.partitionHistorySupplier(cctx.cacheId(), p);
 
-                if (picked.isEmpty()) {
-                    top.own(part);
+                    if (nodeId != null)
+                        histSupplier = cctx.discovery().node(nodeId);
+                }
 
-                    if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
-                        DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+                if (histSupplier != null) {
+                    if (part.state() != MOVING) {
+                        if (log.isDebugEnabled())
+                            log.debug("Skipping partition assignment (state is not MOVING):
" + part);
 
-                        cctx.events().addPreloadEvent(p,
-                            EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
-                            discoEvt.type(), discoEvt.timestamp());
+                        continue; // For.
                     }
 
-                    if (log.isDebugEnabled())
-                        log.debug("Owning partition as there are no other owners: " + part);
-                }
-                else {
-                    ClusterNode n = F.rand(picked);
+                    assert cctx.shared().database().persistenceEnabled();
+                    assert remoteOwners(p, topVer).contains(histSupplier) : remoteOwners(p,
topVer);
 
-                    GridDhtPartitionDemandMessage msg = assigns.get(n);
+                    GridDhtPartitionDemandMessage msg = assigns.get(histSupplier);
 
                     if (msg == null) {
-                        assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                        assigns.put(histSupplier, msg = new GridDhtPartitionDemandMessage(
                             top.updateSequence(),
                             exchFut.exchangeId().topologyVersion(),
                             cctx.cacheId()));
                     }
 
-                    msg.addPartition(p);
+                    msg.addPartition(p, true);
+                }
+                else {
+                    if (cctx.shared().database().persistenceEnabled()) {
+                        if (part.state() == RENTING || part.state() == EVICTED) {
+                            try {
+                                part.rent(false).get();
+                            }
+                            catch (IgniteCheckedException e) {
+                                U.error(log, "Error while clearing outdated local partition",
e);
+                            }
+
+                            part = top.localPartition(p, topVer, true);
+
+                            assert part != null;
+                        }
+                    }
+
+                    if (part.state() != MOVING) {
+                        if (log.isDebugEnabled())
+                            log.debug("Skipping partition assignment (state is not MOVING):
" + part);
+
+                        continue; // For.
+                    }
+
+                    Collection<ClusterNode> picked = pickedOwners(p, topVer);
+
+                    if (picked.isEmpty()) {
+                        top.own(part);
+
+                        if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST))
{
+                            DiscoveryEvent discoEvt = exchFut.discoveryEvent();
+
+                            cctx.events().addPreloadEvent(p,
+                                EVT_CACHE_REBALANCE_PART_DATA_LOST, discoEvt.eventNode(),
+                                discoEvt.type(), discoEvt.timestamp());
+                        }
+
+                        if (log.isDebugEnabled())
+                            log.debug("Owning partition as there are no other owners: " +
part);
+                    }
+                    else {
+                        ClusterNode n = F.rand(picked);
+
+                        GridDhtPartitionDemandMessage msg = assigns.get(n);
+
+                        if (msg == null) {
+                            assigns.put(n, msg = new GridDhtPartitionDemandMessage(
+                                top.updateSequence(),
+                                exchFut.exchangeId().topologyVersion(),
+                                cctx.cacheId()));
+                        }
+
+                        msg.addPartition(p, false);
+                    }
                 }
             }
         }
@@ -379,7 +429,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2
s) {
+    @Override public void handleSupplyMessage(int idx, UUID id, final GridDhtPartitionSupplyMessageV2
s) {
         if (!enterBusy())
             return;
 
@@ -399,7 +449,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
     }
 
     /** {@inheritDoc} */
-    public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage d) {
+    @Override public void handleDemandMessage(int idx, UUID id, GridDhtPartitionDemandMessage
d) {
         if (!enterBusy())
             return;
 
@@ -789,7 +839,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
                                 try {
                                     part.tryEvict();
 
-                                    if (part.state() != EVICTED)
+                                    if (part.state() == RENTING)
                                         partsToEvict.push(part);
                                 }
                                 catch (Throwable ex) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
new file mode 100644
index 0000000..9db80ae
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionCountersMap.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.internal.util.typedef.T2;
+
+/**
+ * Partition counters map.
+ */
+public class IgniteDhtPartitionCountersMap implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private Map<Integer, Map<Integer, T2<Long, Long>>> map;
+
+    /**
+     * @param cacheId Cache ID.
+     * @param cntrMap Counters map.
+     */
+    public synchronized void putIfAbsent(int cacheId, Map<Integer, T2<Long, Long>>
cntrMap) {
+        if (map == null)
+            map = new HashMap<>();
+
+        if (!map.containsKey(cacheId))
+            map.put(cacheId, cntrMap);
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @return Counters map.
+     */
+    public synchronized Map<Integer, T2<Long, Long>> get(int cacheId) {
+        if (map == null)
+            map = new HashMap<>();
+
+        Map<Integer, T2<Long, Long>> cntrMap = map.get(cacheId);
+
+        if (cntrMap == null)
+            return Collections.emptyMap();
+
+        return cntrMap;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
new file mode 100644
index 0000000..333eb97
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionHistorySuppliersMap.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class IgniteDhtPartitionHistorySuppliersMap implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private static final IgniteDhtPartitionHistorySuppliersMap EMPTY = new IgniteDhtPartitionHistorySuppliersMap();
+
+    /** */
+    private Map<UUID, Map<T2<Integer, Integer>, Long>> map;
+
+    /**
+     * @return Empty map.
+     */
+    public static IgniteDhtPartitionHistorySuppliersMap empty() {
+        return EMPTY;
+    }
+
+    /**
+     * @param cacheId Cache ID.
+     * @param partId Partition ID.
+     * @return Supplier UUID.
+     */
+    @Nullable public synchronized UUID getSupplier(int cacheId, int partId) {
+        if (map == null)
+            return null;
+
+        for (Map.Entry<UUID, Map<T2<Integer, Integer>, Long>> e : map.entrySet())
{
+            if (e.getValue().containsKey(new T2<>(cacheId, partId)))
+                return e.getKey();
+        }
+
+        return null;
+    }
+
+    /**
+     * @param nodeId Node ID to check.
+     * @return Reservations for the given node.
+     */
+    @Nullable public synchronized Map<T2<Integer, Integer>, Long> getReservations(UUID
nodeId) {
+        if (map == null)
+            return null;
+
+        return map.get(nodeId);
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param cacheId Cache ID.
+     * @param partId Partition ID.
+     * @param cntr Partition counter.
+     */
+    public synchronized void put(UUID nodeId, int cacheId, int partId, long cntr) {
+        Map<T2<Integer, Integer>, Long> nodeMap = map.get(nodeId);
+
+        if (nodeMap == null) {
+            nodeMap = new HashMap<>();
+
+            map.put(nodeId, nodeMap);
+        }
+
+        nodeMap.put(new T2<>(cacheId, partId), cntr);
+    }
+
+    /**
+     * @return {@code True} if empty.
+     */
+    public synchronized boolean isEmpty() {
+        return map == null || map.isEmpty();
+    }
+
+    /**
+     * @param that Other map to put.
+     */
+    public synchronized void putAll(IgniteDhtPartitionHistorySuppliersMap that) {
+        map = that.map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7e956ed0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
new file mode 100644
index 0000000..2a72e95
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtPartitionsToReloadMap.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Partition reload map.
+ */
+public class IgniteDhtPartitionsToReloadMap implements Serializable {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private Map<UUID, Map<Integer, Set<Integer>>> map;
+
+    /**
+     * @param nodeId Node ID.
+     * @param cacheId Cache ID.
+     * @return Collection of partitions to reload.
+     */
+    public synchronized Set<Integer> get(UUID nodeId, int cacheId) {
+        if (map == null)
+            return Collections.emptySet();
+
+        Map<Integer, Set<Integer>> nodeMap = map.get(nodeId);
+
+        if (nodeMap == null)
+            return Collections.emptySet();
+
+        Set<Integer> parts = nodeMap.get(cacheId);
+
+        if (parts == null)
+            return Collections.emptySet();
+
+        return parts;
+    }
+
+    /**
+     * @param nodeId Node ID.
+     * @param cacheId Cache ID.
+     * @param partId Partition ID.
+     */
+    public synchronized void put(UUID nodeId, int cacheId, int partId) {
+        if (map == null)
+            map = new HashMap<>();
+
+        Map<Integer, Set<Integer>> nodeMap = map.get(nodeId);
+
+        if (nodeMap == null) {
+            nodeMap = new HashMap<>();
+
+            map.put(nodeId, nodeMap);
+        }
+
+        Set<Integer> parts = nodeMap.get(cacheId);
+
+        if (parts == null) {
+            parts = new HashSet<>();
+
+            nodeMap.put(cacheId, parts);
+        }
+
+        parts.add(partId);
+    }
+}


Mime
View raw message