ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [09/14] ignite git commit: https://issues.apache.org/jira/browse/IGNITE-2329 - single get
Date Fri, 05 Feb 2016 12:25:45 GMT
https://issues.apache.org/jira/browse/IGNITE-2329 - single get


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7b987466
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7b987466
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7b987466

Branch: refs/heads/ignite-2329-1
Commit: 7b987466af3fa0789e4cde8eb3f9a44126be129c
Parents: 3e97d82
Author: Yakov Zhdanov <yzhdanov@gridgain.com>
Authored: Tue Feb 2 18:50:11 2016 +0300
Committer: Yakov Zhdanov <yzhdanov@gridgain.com>
Committed: Tue Feb 2 18:50:11 2016 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  18 +-
 .../processors/cache/GridCachePreloader.java    |   6 +
 .../cache/GridCachePreloaderAdapter.java        |   5 +
 .../distributed/dht/GridDhtCacheAdapter.java    |  69 ++-
 .../distributed/dht/GridDhtEmbeddedFuture.java  |  13 +-
 .../distributed/dht/GridDhtGetSingleFuture.java | 479 +++++++++++++++++++
 .../dht/preloader/GridDhtPreloader.java         |  12 +
 7 files changed, 564 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 986e529..ef7d30a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1852,7 +1852,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                         ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion())
:
                         tx.topologyVersion();
 
-                final Map<K1, V1> map = U.newHashMap(keys.size());
+                final Map<K1, V1> map = keys.size() == 1 ?
+                    (Map<K1, V1>)new IgniteBiTuple<>() :
+                    U.<K1, V1>newHashMap(keys.size());
 
                 final boolean storeEnabled = !skipVals && readThrough &&
ctx.readThrough();
 
@@ -2046,17 +2048,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
                         }
                     );
                 }
-                else {
-                    // If misses is not empty and store is disabled, we should touch missed
entries.
-                    if (misses != null) {
-                        for (KeyCacheObject key : misses.keySet()) {
-                            GridCacheEntryEx entry = peekEx(key);
-
-                            if (entry != null)
-                                ctx.evicts().touch(entry, topVer);
-                        }
-                    }
-                }
+                else
+                    // Misses can be non-zero only if store is enabled.
+                    assert misses == null;
 
                 return new GridFinishedFuture<>(map);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
index c8fcb90..be019fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java
@@ -137,6 +137,12 @@ public interface GridCachePreloader {
     public IgniteInternalFuture<Boolean> rebalanceFuture();
 
     /**
+     * @return {@code true} if there is no need to force keys preloading
+     *      (e.g. rebalancing has been completed).
+     */
+    public boolean needForceKeys();
+
+    /**
      * Requests that preloader sends the request for the key.
      *
      * @param keys Keys to request.

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
index a1704fc..5d98c6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java
@@ -93,6 +93,11 @@ public class GridCachePreloaderAdapter implements GridCachePreloader {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean needForceKeys() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public void onReconnected() {
         // No-op.
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 749ee4d..8e456e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -719,21 +719,63 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
     /**
      * @param nodeId Node ID.
+     * @param msgId Message ID.
+     * @param key Key.
+     * @param addRdr Add reader flag.
+     * @param readThrough Read through flag.
+     * @param topVer Topology version flag.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash.
+     * @param expiry Expiry.
+     * @param skipVals Skip vals flag.
+     * @return Future for the operation.
+     */
+    private IgniteInternalFuture<GridCacheEntryInfo> getDhtSingleAsync(
+        UUID nodeId,
+        long msgId,
+        KeyCacheObject key,
+        boolean addRdr,
+        boolean readThrough,
+        AffinityTopologyVersion topVer,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        @Nullable IgniteCacheExpiryPolicy expiry,
+        boolean skipVals
+    ) {
+        GridDhtGetSingleFuture<K, V> fut = new GridDhtGetSingleFuture<>(
+            ctx,
+            msgId,
+            nodeId,
+            key,
+            addRdr,
+            readThrough,
+            /*tx*/null,
+            topVer,
+            subjId,
+            taskNameHash,
+            expiry,
+            skipVals);
+
+        fut.init();
+
+        return fut;
+    }
+
+    /**
+     * @param nodeId Node ID.
      * @param req Get request.
      */
     protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest
req) {
         assert ctx.affinityNode();
 
-        long ttl = req.accessTtl();
-
-        final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl);
-
-        Map<KeyCacheObject, Boolean> map = Collections.singletonMap(req.key(), req.addReader());
+        final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(req.accessTtl());
 
-        IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut =
-            getDhtAsync(nodeId,
+        IgniteInternalFuture<GridCacheEntryInfo> fut =
+            getDhtSingleAsync(
+                nodeId,
                 req.messageId(),
-                map,
+                req.key(),
+                req.addReader(),
                 req.readThrough(),
                 req.topologyVersion(),
                 req.subjectId(),
@@ -741,19 +783,16 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
                 expiryPlc,
                 req.skipValues());
 
-        fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>()
{
-            @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>>
f) {
+        fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() {
+            @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> f)
{
                 GridNearSingleGetResponse res;
 
-                GridDhtFuture<Collection<GridCacheEntryInfo>> fut =
-                    (GridDhtFuture<Collection<GridCacheEntryInfo>>)f;
+                GridDhtFuture<GridCacheEntryInfo> fut = (GridDhtFuture<GridCacheEntryInfo>)f;
 
                 try {
-                    Collection<GridCacheEntryInfo> entries = fut.get();
+                    GridCacheEntryInfo info = fut.get();
 
                     if (F.isEmpty(fut.invalidPartitions())) {
-                        GridCacheEntryInfo info = F.first(entries);
-
                         Message res0 = null;
 
                         if (info != null) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
index 0d10a93..1b9f743 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtEmbeddedFuture.java
@@ -21,7 +21,6 @@ import java.util.Collection;
 import java.util.Collections;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
-import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteBiClosure;
 
@@ -32,10 +31,6 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A,
B> implem
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Retries. */
-    @GridToStringInclude
-    private Collection<Integer> invalidParts;
-
     /**
      * @param c Closure.
      * @param embedded Embedded.
@@ -45,8 +40,6 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A,
B> implem
         IgniteInternalFuture<B> embedded
     ) {
         super(c, embedded);
-
-        invalidParts = Collections.emptyList();
     }
 
     /**
@@ -58,17 +51,15 @@ public class GridDhtEmbeddedFuture<A, B> extends GridEmbeddedFuture<A,
B> implem
         IgniteBiClosure<B, Exception, IgniteInternalFuture<A>> c
     ) {
         super(embedded, c);
-
-        invalidParts = Collections.emptyList();
     }
 
     /** {@inheritDoc} */
     @Override public Collection<Integer> invalidPartitions() {
-        return invalidParts;
+        return Collections.emptyList();
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridDhtEmbeddedFuture.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
new file mode 100644
index 0000000..4439307
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetSingleFuture.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
+import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCacheEntryInfo>
+    implements GridDhtFuture<GridCacheEntryInfo> {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** Logger reference. */
+    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+    /** Logger. */
+    private static IgniteLogger log;
+
+    /** Message ID. */
+    private long msgId;
+
+    /** */
+    private UUID reader;
+
+    /** Read through flag. */
+    private boolean readThrough;
+
+    /** Context. */
+    private GridCacheContext<K, V> cctx;
+
+    /** Key. */
+    private KeyCacheObject key;
+
+    /** */
+    private boolean addRdr;
+
+    /** Reserved partitions. */
+    private int part = -1;
+
+    /** Future ID. */
+    private IgniteUuid futId;
+
+    /** Version. */
+    private GridCacheVersion ver;
+
+    /** Topology version .*/
+    private AffinityTopologyVersion topVer;
+
+    /** Transaction. */
+    private IgniteTxLocalEx tx;
+
+    /** Retries because ownership changed. */
+    private Collection<Integer> retries;
+
+    /** Subject ID. */
+    private UUID subjId;
+
+    /** Task name. */
+    private int taskNameHash;
+
+    /** Expiry policy. */
+    private IgniteCacheExpiryPolicy expiryPlc;
+
+    /** Skip values flag. */
+    private boolean skipVals;
+
+    /**
+     * @param cctx Context.
+     * @param msgId Message ID.
+     * @param reader Reader.
+     * @param key Key.
+     * @param addRdr Add reader flag.
+     * @param readThrough Read through flag.
+     * @param tx Transaction.
+     * @param topVer Topology version.
+     * @param subjId Subject ID.
+     * @param taskNameHash Task name hash code.
+     * @param expiryPlc Expiry policy.
+     * @param skipVals Skip values flag.
+     */
+    public GridDhtGetSingleFuture(
+        GridCacheContext<K, V> cctx,
+        long msgId,
+        UUID reader,
+        KeyCacheObject key,
+        Boolean addRdr,
+        boolean readThrough,
+        @Nullable IgniteTxLocalEx tx,
+        @NotNull AffinityTopologyVersion topVer,
+        @Nullable UUID subjId,
+        int taskNameHash,
+        @Nullable IgniteCacheExpiryPolicy expiryPlc,
+        boolean skipVals
+    ) {
+        assert reader != null;
+        assert key != null;
+
+        this.reader = reader;
+        this.cctx = cctx;
+        this.msgId = msgId;
+        this.key = key;
+        this.addRdr = addRdr;
+        this.readThrough = readThrough;
+        this.tx = tx;
+        this.topVer = topVer;
+        this.subjId = subjId;
+        this.taskNameHash = taskNameHash;
+        this.expiryPlc = expiryPlc;
+        this.skipVals = skipVals;
+
+        futId = IgniteUuid.randomUuid();
+
+        ver = tx == null ? cctx.versions().next() : tx.xidVersion();
+
+        if (log == null)
+            log = U.logger(cctx.kernalContext(), logRef, GridDhtGetSingleFuture.class);
+    }
+
+    /**
+     * Initializes future.
+     */
+    void init() {
+        map();
+    }
+
+    /**
+     * @return Future ID.
+     */
+    public IgniteUuid futureId() {
+        return futId;
+    }
+
+    /**
+     * @return Future version.
+     */
+    public GridCacheVersion version() {
+        return ver;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onDone(GridCacheEntryInfo res, Throwable err) {
+        if (super.onDone(res, err)) {
+            // Release all partitions reserved by this future.
+            if (part != -1)
+                cctx.topology().releasePartitions(part);
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     *
+     */
+    private void map() {
+        if (cctx.dht().dhtPreloader().needForceKeys()) {
+            GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(
+                Collections.singleton(key),
+                topVer);
+
+            if (fut != null) {
+                if (F.isEmpty(fut.invalidPartitions())) {
+                    if (retries == null)
+                        retries = new HashSet<>();
+
+                    retries.addAll(fut.invalidPartitions());
+                }
+
+                fut.listen(
+                    new IgniteInClosure<IgniteInternalFuture<Object>>() {
+                        @Override public void apply(IgniteInternalFuture<Object> fut)
{
+                            Throwable e = fut.error();
+
+                            if (e != null) { // Check error first.
+                                if (log.isDebugEnabled())
+                                    log.debug("Failed to request keys from preloader " +
+                                        "[keys=" + key + ", err=" + e + ']');
+
+                                onDone(e);
+                            }
+                            else
+                                map0();
+                        }
+                    }
+                );
+
+                return;
+            }
+        }
+
+        map0();
+    }
+
+    /**
+     *
+     */
+    private void map0() {
+        // Assign keys to primary nodes.
+        int part = cctx.affinity().partition(key);
+
+        if (retries == null || !retries.contains(part)) {
+            if (!map(key)) {
+                retries = Collections.singleton(part);
+
+                onDone((GridCacheEntryInfo)null);
+
+                return;
+            }
+        }
+
+        getAsync();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<Integer> invalidPartitions() {
+        return retries == null ? Collections.<Integer>emptyList() : retries;
+    }
+
+    /**
+     * @param key Key.
+     * @return {@code True} if mapped.
+     */
+    private boolean map(KeyCacheObject key) {
+        GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
+            cache().topology().localPartition(cctx.affinity().partition(key), topVer, true)
:
+            cache().topology().localPartition(key, false);
+
+        if (part == null)
+            return false;
+
+        assert this.part == -1;
+
+        // By reserving, we make sure that partition won't be unloaded while processed.
+        if (part.reserve()) {
+            this.part = part.id();
+
+            return true;
+        }
+        else
+            return false;
+    }
+
+    /**
+     *
+     */
+    @SuppressWarnings( {"unchecked", "IfMayBeConditional"})
+    private void getAsync() {
+        assert part != -1;
+
+        String taskName0 = cctx.kernalContext().job().currentTaskName();
+
+        if (taskName0 == null)
+            taskName0 = cctx.kernalContext().task().resolveTaskName(taskNameHash);
+
+        final String taskName = taskName0;
+
+        IgniteInternalFuture<Boolean> rdrFut = null;
+
+        ClusterNode readerNode = cctx.discovery().node(reader);
+
+        if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode,
cctx.name())) {
+            while (true) {
+                GridDhtCacheEntry e = cache().entryExx(key, topVer);
+
+                try {
+                    if (e.obsolete())
+                        continue;
+
+                    boolean addReader = (!e.deleted() && addRdr && !skipVals);
+
+                    if (addReader)
+                        e.unswap(false);
+
+                    // Register reader. If there are active transactions for this entry,
+                    // then will wait for their completion before proceeding.
+                    // TODO: GG-4003:
+                    // TODO: What if any transaction we wait for actually removes this entry?
+                    // TODO: In this case seems like we will be stuck with untracked near
entry.
+                    // TODO: To fix, check that reader is contained in the list of readers
once
+                    // TODO: again after the returned future completes - if not, try again.
+                    rdrFut = addReader ? e.addReader(reader, msgId, topVer) : null;
+
+                    break;
+                }
+                catch (IgniteCheckedException err) {
+                    onDone(err);
+
+                    return;
+                }
+                catch (GridCacheEntryRemovedException ignore) {
+                    if (log.isDebugEnabled())
+                        log.debug("Got removed entry when getting a DHT value: " + e);
+                }
+                finally {
+                    cctx.evicts().touch(e, topVer);
+                }
+            }
+        }
+
+        IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>
fut;
+
+        if (rdrFut == null || rdrFut.isDone()) {
+            if (tx == null) {
+                fut = cache().getDhtAllAsync(
+                    Collections.singleton(key),
+                    readThrough,
+                    subjId,
+                    taskName,
+                    expiryPlc,
+                    skipVals,
+                    /*can remap*/true);
+            }
+            else {
+                fut = tx.getAllAsync(cctx,
+                    Collections.singleton(key),
+                    /*deserialize binary*/false,
+                    skipVals,
+                    /*keep cache objects*/true,
+                    /*skip store*/!readThrough,
+                    false);
+            }
+        }
+        else {
+            rdrFut.listen(
+                new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                    @Override public void apply(IgniteInternalFuture<Boolean> fut)
{
+                        Throwable e = fut.error();
+
+                        if (e != null) {
+                            onDone(e);
+
+                            return;
+                        }
+
+                        IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject,
GridCacheVersion>>> fut0;
+
+                        if (tx == null) {
+                            fut0 = cache().getDhtAllAsync(
+                                Collections.singleton(key),
+                                readThrough,
+                                subjId,
+                                taskName,
+                                expiryPlc,
+                                skipVals,
+                                /*can remap*/true);
+                        }
+                        else {
+                            fut0 = tx.getAllAsync(cctx,
+                                Collections.singleton(key),
+                                /*deserialize binary*/false,
+                                skipVals,
+                                /*keep cache objects*/true,
+                                /*skip store*/!readThrough,
+                                false
+                            );
+                        }
+
+                        fut0.listen(createGetFutureListener());
+                    }
+                }
+            );
+
+            return;
+        }
+
+        if (fut.isDone())
+            onResult(fut);
+        else
+            fut.listen(createGetFutureListener());
+    }
+
+    /**
+     * @return Listener for get future.
+     */
+    @NotNull private IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject,
GridCacheVersion>>>>
+    createGetFutureListener() {
+        return new IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject,
GridCacheVersion>>>>() {
+            @Override public void apply(
+                IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>
fut
+            ) {
+                onResult(fut);
+            }
+        };
+    }
+
+    /**
+     * @param fut Completed future to finish this process with.
+     */
+    private void onResult(IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject,
GridCacheVersion>>> fut) {
+        assert fut.isDone();
+
+        if (fut.error() != null)
+            onDone(fut.error());
+        else {
+            try {
+                onDone(toEntryInfo(fut.get()));
+            }
+            catch (IgniteCheckedException e) {
+                assert false; // Should never happen.
+            }
+        }
+    }
+
+    /**
+     * @param map Map to convert.
+     * @return List of infos.
+     */
+    private GridCacheEntryInfo toEntryInfo(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>
map) {
+        Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> e = F.firstEntry(map);
+
+        if (e != null) {
+            GridCacheEntryInfo info = new GridCacheEntryInfo();
+
+            T2<CacheObject, GridCacheVersion> val = e.getValue();
+
+            assert val != null;
+
+            info.cacheId(cctx.cacheId());
+            info.key(e.getKey());
+            info.value(skipVals ? null : val.get1());
+            info.version(val.get2());
+
+            return info;
+        }
+
+        return null;
+    }
+
+    /**
+     * @return DHT cache.
+     */
+    private GridDhtCacheAdapter<K, V> cache() {
+        return (GridDhtCacheAdapter<K, V>)cctx.cache();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/7b987466/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
index 0bff618..a92a080 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java
@@ -693,6 +693,18 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean needForceKeys() {
+        if (cctx.rebalanceEnabled()) {
+            IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture();
+
+            if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result()))
+                return false;
+        }
+
+        return true;
+    }
+
     /**
      * @param keys Keys to request.
      * @return Future for request.


Mime
View raw message