ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [02/50] [abbrv] incubator-ignite git commit: # ignite-42
Date Thu, 22 Jan 2015 15:25:04 GMT
# ignite-42


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/738c67fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/738c67fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/738c67fd

Branch: refs/heads/ignite-65
Commit: 738c67fdb935e90025c18fa6c5fe428ffa95eb90
Parents: f4b3995
Author: sboikov <sboikov@gridgain.com>
Authored: Tue Jan 20 14:22:18 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Tue Jan 20 14:22:18 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/IgniteCacheProxy.java      |  37 ++-
 .../IgniteNullArgumentCheckedException.java     |  32 +++
 .../processors/cache/CacheInvokeEntry.java      |  17 +-
 .../processors/cache/GridCacheAdapter.java      | 241 ++++++++++++++++---
 .../processors/cache/GridCacheStoreManager.java | 195 ++++++++++-----
 .../dht/atomic/GridDhtAtomicCache.java          |   4 +
 .../IgniteCacheLoadAllAbstractTest.java         |  65 +++++
 7 files changed, 486 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index 1e4426e..8e346e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -552,10 +552,31 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
 
     /** {@inheritDoc} */
     @Override public void loadAll(Set<? extends K> keys,
-        boolean replaceExistingValues,
-        @Nullable CompletionListener completionLsnr) {
-        // TODO IGNITE-1.
-        throw new UnsupportedOperationException();
+        boolean replaceExisting,
+        @Nullable final CompletionListener completionLsnr) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            IgniteFuture<?>  fut = ctx.cache().loadAll(keys, replaceExisting);
+
+            if (completionLsnr != null) {
+                fut.listenAsync(new CI1<IgniteFuture<?>>() {
+                    @Override public void apply(IgniteFuture<?> fut) {
+                        try {
+                            fut.get();
+
+                            completionLsnr.onCompletion();
+                        }
+                        catch (IgniteCheckedException e) {
+                            completionLsnr.onException(cacheException(e));
+                        }
+                    }
+                });
+            }
+        }
+        finally {
+            gate.leave(prev);
+        }
     }
 
     /** {@inheritDoc} */
@@ -1094,14 +1115,14 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
      * @return Cache exception.
      */
     private CacheException cacheException(IgniteCheckedException e) {
-        Throwable[] suppressed = e.getSuppressed();
+        if (e instanceof GridCachePartialUpdateException)
+            return new CachePartialUpdateException((GridCachePartialUpdateException)e);
+        else if (e instanceof IgniteNullArgumentCheckedException)
+            throw new NullPointerException(e.getMessage());
 
         if (e.getCause() instanceof CacheException)
             return (CacheException)e.getCause();
 
-        if (e instanceof GridCachePartialUpdateException)
-            return new CachePartialUpdateException((GridCachePartialUpdateException)e);
-
         return new CacheException(e);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteNullArgumentCheckedException.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteNullArgumentCheckedException.java
b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteNullArgumentCheckedException.java
new file mode 100644
index 0000000..42cc75a
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/IgniteNullArgumentCheckedException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.gridgain.grid.kernal;
+
+import org.apache.ignite.*;
+
+/**
+ *
+ */
+public class IgniteNullArgumentCheckedException extends IgniteCheckedException {
+    /**
+     * @param msg Message.
+     */
+    public IgniteNullArgumentCheckedException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
index ab7dfc4..e44aff1 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeEntry.java
@@ -11,6 +11,7 @@ package org.gridgain.grid.kernal.processors.cache;
 
 import org.gridgain.grid.util.tostring.*;
 import org.gridgain.grid.util.typedef.internal.*;
+import org.jetbrains.annotations.*;
 
 import javax.cache.processor.*;
 
@@ -29,13 +30,20 @@ public class CacheInvokeEntry<K, V> implements MutableEntry<K,
V> {
     /** */
     private boolean modified;
 
+    /** */
+    private final boolean hadVal;
+
     /**
      * @param key Key.
      * @param val Value.
      */
-    public CacheInvokeEntry(K key, V val) {
+    public CacheInvokeEntry(K key, @Nullable V val) {
+        assert key != null;
+
         this.key = key;
         this.val = val;
+
+        hadVal = val != null;
     }
 
     /** {@inheritDoc} */
@@ -76,9 +84,14 @@ public class CacheInvokeEntry<K, V> implements MutableEntry<K,
V> {
     }
 
     /**
-     * @return {@code True} if {@link #setValue} or {@link #remove was called}.
+     * @return {@code True} if entry was modified.
      */
     public boolean modified() {
+        if (modified) {
+            if (!hadVal && val == null)
+                return false;
+        }
+
         return modified;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
index ae2da3e..8b27501 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheAdapter.java
@@ -3355,45 +3355,146 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter
im
         return ctx.tm().synchronizations();
     }
 
-    /** {@inheritDoc} */
-    @Override public void loadCache(final IgniteBiPredicate<K, V> p, final long ttl,
Object[] args) throws IgniteCheckedException {
-        final boolean replicate = ctx.isDrEnabled();
-        final long topVer = ctx.affinity().affinityTopologyVersion();
+    /**
+     * @param keys Keys.
+     * @param replaceExisting Replace existing values flag.
+     * @return Load future.
+     */
+    public IgniteFuture<?> loadAll(final Set<? extends K> keys,
+        boolean replaceExisting) {
+        A.notNull(keys, "keys");
 
-        if (ctx.store().isLocalStore()) {
-            try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(),
false)) {
-                ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
+        if (keys.size() < 10) {
+            for (K key : keys) {
+                if (key == null)
+                    throw new NullPointerException();
+            }
+        }
 
-                final Collection<Map.Entry<K, V>> col = new ArrayList<>(ldr.perNodeBufferSize());
+        if (!ctx.store().configured())
+            return new GridFinishedFuture<>(ctx.kernalContext());
 
-                ctx.store().loadCache(new CIX3<K, V, GridCacheVersion>() {
-                    @Override public void applyx(K key, V val, GridCacheVersion ver) throws
IgniteCheckedException {
-                        assert ver != null;
+        if (replaceExisting) {
+            if (ctx.store().isLocalStore()) {
+                assert false;
 
-                        if (p != null && !p.apply(key, val))
-                            return;
+                return null;
+            }
+            else {
+                return ctx.closures().callLocalSafe(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        loadAll(keys);
 
-                        if (ctx.portableEnabled()) {
-                            key = (K)ctx.marshalToPortable(key);
-                            val = (V)ctx.marshalToPortable(val);
+                        return null;
+                    }
+                });
+            }
+        }
+        else {
+            return ctx.closures().callLocalSafe(new Callable<Void>() {
+                @Override public Void call() throws Exception {
+                    // Version for all loaded entries.
+                    final GridCacheVersion ver0 = ctx.versions().nextForLoad();
+                    final boolean replicate = ctx.isDrEnabled();
+                    final long topVer = ctx.affinity().affinityTopologyVersion();
+
+                    ctx.store().loadAllFromStore(null, keys, new CIX2<K, V>() {
+                        @Override public void applyx(K key, V val)
+                            throws PortableException {
+                            if (ctx.portableEnabled()) {
+                                key = (K)ctx.marshalToPortable(key);
+                                val = (V)ctx.marshalToPortable(val);
+                            }
+
+                            GridCacheEntryEx<K, V> entry = entryEx(key, false);
+
+                            try {
+                                entry.initialValue(val, null, ver0, 0, -1, false, topVer,
replicate ? DR_LOAD : DR_NONE);
+                            }
+                            catch (IgniteCheckedException e) {
+                                throw new IgniteException("Failed to put cache value: " +
entry, e);
+                            }
+                            catch (GridCacheEntryRemovedException ignore) {
+                                if (log.isDebugEnabled())
+                                    log.debug("Got removed entry during loadCache (will ignore):
" + entry);
+                            }
+                            finally {
+                                ctx.evicts().touch(entry, topVer);
+                            }
+
+                            CU.unwindEvicts(ctx);
                         }
+                    });
 
-                        GridVersionedEntry<K,V> e = new GridRawVersionedEntry<>(key,
null, val, null, ttl, 0, ver);
+                    return null;
+                }
+            });
+        }
+    }
 
-                        e.marshal(ctx.marshaller());
+    /**
+     * @param keys Keys.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void loadAllLocalStore(final Set<? extends K> keys) throws IgniteCheckedException
{
+        assert ctx.store().isLocalStore();
 
-                        col.add(e);
+        try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(),
false)) {
+            ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
 
-                        if (col.size() == ldr.perNodeBufferSize()) {
-                            ldr.addData(col);
+            LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, 0);
 
-                            col.clear();
-                        }
+            ctx.store().loadAllFromLocalStore(null, keys, c);
+
+            c.onDone();
+        }
+    }
+
+    /**
+     * @param keys Keys.
+     * @throws IgniteCheckedException If failed.
+     */
+    private void loadAll(final Set<? extends K> keys) throws IgniteCheckedException
{
+        try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(),
false)) {
+            final Collection<Map.Entry<K, V>> col = new ArrayList<>(ldr.perNodeBufferSize());
+
+            ctx.store().loadAllFromStore(null, keys, new CIX2<K, V>() {
+                @Override public void applyx(K key, V val) throws IgniteCheckedException
{
+                    if (ctx.portableEnabled()) {
+                        key = (K)ctx.marshalToPortable(key);
+                        val = (V)ctx.marshalToPortable(val);
                     }
-                }, args);
 
-                if (!col.isEmpty())
-                    ldr.addData(col);
+                    col.add(new GridMapEntry<>(key, val));
+
+                    if (col.size() == ldr.perNodeBufferSize()) {
+                        ldr.addData(col);
+
+                        col.clear();
+                    }
+                }
+            });
+
+            if (!col.isEmpty())
+                ldr.addData(col);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void loadCache(final IgniteBiPredicate<K, V> p, final long ttl,
Object[] args)
+        throws IgniteCheckedException {
+        final boolean replicate = ctx.isDrEnabled();
+        final long topVer = ctx.affinity().affinityTopologyVersion();
+
+        if (ctx.store().isLocalStore()) {
+            try (final IgniteDataLoader<K, V> ldr = ctx.kernalContext().<K, V>dataLoad().dataLoader(ctx.namex(),
false)) {
+                ldr.updater(new GridDrDataLoadCacheUpdater<K, V>());
+
+                LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, ttl);
+
+                ctx.store().loadCache(c, args);
+
+                c.onDone();
             }
         }
         else {
@@ -3439,8 +3540,7 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter
im
     @Override public IgniteFuture<?> loadCacheAsync(final IgniteBiPredicate<K, V>
p, final long ttl, final Object[] args) {
         return ctx.closures().callLocalSafe(
             ctx.projectSafe(new Callable<Object>() {
-                @Nullable
-                @Override public Object call() throws IgniteCheckedException {
+                @Nullable @Override public Object call() throws IgniteCheckedException {
                     loadCache(p, ttl, args);
 
                     return null;
@@ -4509,7 +4609,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter
im
         ctx.denyOnFlags(F.asList(LOCAL, READ));
 
         return ctx.closures().callLocalSafe(ctx.projectSafe(new Callable<V>() {
-            @Nullable @Override public V call() throws IgniteCheckedException {
+            @Nullable
+            @Override
+            public V call() throws IgniteCheckedException {
                 return reload(key, filter);
             }
         }), true);
@@ -4537,7 +4639,9 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter
im
         ctx.denyOnFlag(READ);
 
         return ctx.closures().callLocalSafe(ctx.projectSafe(new GPC() {
-            @Nullable @Override public Object call() throws IgniteCheckedException {
+            @Nullable
+            @Override
+            public Object call() throws IgniteCheckedException {
                 reloadAll(filter);
 
                 return null;
@@ -4547,11 +4651,13 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter
im
 
     /**
      * @param keys Keys.
+     * @param deserializePortable Deserialize portable flag.
      * @param filter Filter to evaluate.
      * @return Read future.
      */
     public IgniteFuture<Map<K, V>> getAllAsync(@Nullable Collection<? extends
K> keys,
-        boolean deserializePortable, @Nullable IgnitePredicate<GridCacheEntry<K, V>>
filter) {
+        boolean deserializePortable,
+        @Nullable IgnitePredicate<GridCacheEntry<K, V>> filter) {
         String taskName = ctx.kernalContext().job().currentTaskName();
 
         if (ctx.portableEnabled() && !F.isEmpty(keys)) {
@@ -4562,8 +4668,14 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter
im
             });
         }
 
-        return getAllAsync(keys, ctx.hasFlag(GET_PRIMARY), /*skip tx*/false, null, null,
taskName,
-            deserializePortable, filter);
+        return getAllAsync(keys,
+            ctx.hasFlag(GET_PRIMARY),
+            /*skip tx*/false,
+            null,
+            null,
+            taskName,
+            deserializePortable,
+            filter);
     }
 
     /**
@@ -5060,4 +5172,67 @@ public abstract class GridCacheAdapter<K, V> extends GridMetadataAwareAdapter
im
             return S.toString(GetExpiryPolicy.class, this);
         }
     }
+
+    /**
+     *
+     */
+    private class LocalStoreLoadClosure extends CIX3<K, V, GridCacheVersion> {
+        /** */
+        final IgniteBiPredicate<K, V> p;
+
+        /** */
+        final Collection<Map.Entry<K, V>> col;
+
+        /** */
+        final IgniteDataLoader<K, V> ldr;
+
+        /** */
+        final long ttl;
+
+        /**
+         * @param p Key/value predicate.
+         * @param ldr Loader.
+         * @param ttl TTL.
+         */
+        private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, IgniteDataLoader<K,
V> ldr, long ttl) {
+            this.p = p;
+            this.ldr = ldr;
+            this.ttl = ttl;
+
+            col = new ArrayList<>(ldr.perNodeBufferSize());
+        }
+
+        /** {@inheritDoc} */
+        @Override public void applyx(K key, V val, GridCacheVersion ver) throws IgniteCheckedException
{
+            assert ver != null;
+
+            if (p != null && !p.apply(key, val))
+                return;
+
+            if (ctx.portableEnabled()) {
+                key = (K)ctx.marshalToPortable(key);
+                val = (V)ctx.marshalToPortable(val);
+            }
+
+            GridVersionedEntry<K,V> e = new GridRawVersionedEntry<>(key, null,
val, null, ttl, 0, ver);
+
+            e.marshal(ctx.marshaller());
+
+            col.add(e);
+
+            if (col.size() == ldr.perNodeBufferSize()) {
+                ldr.addData(col);
+
+                col.clear();
+            }
+        }
+
+        /**
+         * Adds remaining data to loader.
+         */
+        void onDone() {
+            if (!col.isEmpty())
+                ldr.addData(col);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
index 41a8e64..11e11a3 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheStoreManager.java
@@ -208,7 +208,21 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K,
V> {
      * @return Loaded value, possibly <tt>null</tt>.
      * @throws IgniteCheckedException If data loading failed.
      */
+    @SuppressWarnings("unchecked")
     @Nullable public V loadFromStore(@Nullable IgniteTx tx, K key) throws IgniteCheckedException
{
+        return (V)loadFromStore(tx, key, true);
+    }
+
+    /**
+     * Loads data from persistent store.
+     *
+     * @param tx Cache transaction.
+     * @param key Cache key.
+     * @param convert Convert flag.
+     * @return Loaded value, possibly <tt>null</tt>.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    @Nullable public Object loadFromStore(@Nullable IgniteTx tx, K key, boolean convert)
throws IgniteCheckedException {
         if (store != null) {
             if (key instanceof GridCacheInternal)
                 // Never load internal keys from store as they are never persisted.
@@ -220,16 +234,22 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K,
V> {
             if (log.isDebugEnabled())
                 log.debug("Loading value from store for key: " + key);
 
-            V val = null;
+            Object val = null;
 
             boolean ses = initSession(tx);
 
             try {
-                val = convert(singleThreadGate.load(key));
+                val = singleThreadGate.load(key);
             }
             catch (ClassCastException e) {
                 handleClassCastException(e);
             }
+            catch (CacheLoaderException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheLoaderException(e));
+            }
             finally {
                 if (ses)
                     sesHolder.set(null);
@@ -238,7 +258,13 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K,
V> {
             if (log.isDebugEnabled())
                 log.debug("Loaded value from store [key=" + key + ", val=" + val + ']');
 
-            return cctx.portableEnabled() ? (V)cctx.marshalToPortable(val) : val;
+            if (convert) {
+                val = convert(val);
+
+                return cctx.portableEnabled() ? cctx.marshalToPortable(val) : val;
+            }
+            else
+                return val;
         }
 
         return null;
@@ -263,6 +289,21 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K,
V> {
     }
 
     /**
+     * @param tx Cache transaction.
+     * @param keys Cache keys.
+     * @param vis Closer to cache loaded elements.
+     * @throws IgniteCheckedException If data loading failed.
+     */
+    public void loadAllFromLocalStore(@Nullable IgniteTx tx,
+        Collection<? extends K> keys,
+        final GridInClosure3<K, V, GridCacheVersion> vis) throws IgniteCheckedException
{
+        assert store != null;
+        assert locStore;
+
+        loadAllFromStore(null, keys, null, vis);
+    }
+
+    /**
      * Loads data from persistent store.
      *
      * @param tx Cache transaction.
@@ -276,85 +317,115 @@ public class GridCacheStoreManager<K, V> extends GridCacheManagerAdapter<K,
V> {
         Collection<? extends K> keys,
         final IgniteBiInClosure<K, V> vis) throws IgniteCheckedException {
         if (store != null) {
-            if (!keys.isEmpty()) {
-                if (keys.size() == 1) {
-                    K key = F.first(keys);
+            loadAllFromStore(null, keys, vis, null);
 
-                    vis.apply(key, loadFromStore(tx, key));
+            return true;
+        }
+        else {
+            for (K key : keys)
+                vis.apply(key, null);
+        }
 
-                    return true;
-                }
+        return false;
+    }
 
-                Collection<? extends K> keys0 = convertPortable ?
-                    F.viewReadOnly(keys, new C1<K, K>() {
-                        @Override public K apply(K k) {
-                            return (K)cctx.unwrapPortableIfNeeded(k, false);
-                        }
-                    }) :
-                    keys;
+    /**
+     * @param tx Cache transaction.
+     * @param keys Keys to load.
+     * @param vis Key/value closure (only one of vis or verVis can be specified).
+     * @param verVis Key/value/version closure (only one of vis or verVis can be specified).
+     * @throws IgniteCheckedException If failed.
+     */
+    @SuppressWarnings("unchecked")
+    private void loadAllFromStore(@Nullable IgniteTx tx,
+        Collection<? extends K> keys,
+        final @Nullable IgniteBiInClosure<K, V> vis,
+        final @Nullable GridInClosure3<K, V, GridCacheVersion> verVis) throws IgniteCheckedException
{
+        assert vis != null ^ verVis != null;
+        assert verVis == null || locStore;
 
-                if (log.isDebugEnabled())
-                    log.debug("Loading values from store for keys: " + keys0);
+        final boolean convert = verVis == null;
 
-                boolean ses = initSession(tx);
+        if (!keys.isEmpty()) {
+            if (keys.size() == 1) {
+                K key = F.first(keys);
 
-                try {
-                    if (keys.size() > singleThreadGate.loadAllThreshold()) {
-                        Map<K, Object> map = store.loadAll(keys0);
+                if (convert)
+                    vis.apply(key, loadFromStore(tx, key));
+                else {
+                    IgniteBiTuple<V, GridCacheVersion> t =
+                        (IgniteBiTuple<V, GridCacheVersion>)loadFromStore(tx, key,
false);
+
+                    if (t != null)
+                        verVis.apply(key, t.get1(), t.get2());
+                }
 
-                        if (map != null) {
-                            for (Map.Entry<K, Object> e : map.entrySet()) {
-                                K k = e.getKey();
+                return;
+            }
+
+            Collection<? extends K> keys0 = convertPortable ?
+                F.viewReadOnly(keys, new C1<K, K>() {
+                    @Override public K apply(K k) {
+                        return (K)cctx.unwrapPortableIfNeeded(k, false);
+                    }
+                }) :
+                keys;
 
-                                V v = convert(e.getValue());
+            if (log.isDebugEnabled())
+                log.debug("Loading values from store for keys: " + keys0);
 
-                                if (cctx.portableEnabled()) {
-                                    k = (K)cctx.marshalToPortable(k);
-                                    v = (V)cctx.marshalToPortable(v);
-                                }
+            boolean ses = initSession(tx);
 
-                                vis.apply(k, v);
+            try {
+                CI2<K, Object> c = new CI2<K, Object>() {
+                    @Override public void apply(K k, Object val) {
+                        if (convert) {
+                            V v = convert(val);
+
+                            if (cctx.portableEnabled()) {
+                                k = (K)cctx.marshalToPortable(k);
+                                v = (V)cctx.marshalToPortable(v);
                             }
+
+                            vis.apply(k, v);
+                        }
+                        else {
+                            IgniteBiTuple<V, GridCacheVersion> v = (IgniteBiTuple<V,
GridCacheVersion>)val;
+
+                            if (v != null)
+                                verVis.apply(k, v.get1(), v.get2());
                         }
                     }
-                    else {
-                        singleThreadGate.loadAll(keys0, new CI2<K, Object>() {
-                            @Override public void apply(K k, Object o) {
-                                V v = convert(o);
+                };
 
-                                if (cctx.portableEnabled()) {
-                                    k = (K)cctx.marshalToPortable(k);
-                                    v = (V)cctx.marshalToPortable(v);
-                                }
+                if (keys.size() > singleThreadGate.loadAllThreshold()) {
+                    Map<K, Object> map = store.loadAll(keys0);
 
-                                vis.apply(k, v);
-                            }
-                        });
+                    if (map != null) {
+                        for (Map.Entry<K, Object> e : map.entrySet())
+                            c.apply(e.getKey(), e.getValue());
                     }
                 }
-                catch (ClassCastException e) {
-                    handleClassCastException(e);
-                }
-                catch (Exception e) {
-                    throw U.cast(e);
-                }
-                finally {
-                    if (ses)
-                        sesHolder.set(null);
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Loaded values from store for keys: " + keys0);
+                else
+                    singleThreadGate.loadAll(keys0, c);
+            }
+            catch (ClassCastException e) {
+                handleClassCastException(e);
+            }
+            catch (CacheLoaderException e) {
+                throw new IgniteCheckedException(e);
+            }
+            catch (Exception e) {
+                throw new IgniteCheckedException(new CacheLoaderException(e));
+            }
+            finally {
+                if (ses)
+                    sesHolder.set(null);
             }
 
-            return true;
-        }
-        else {
-            for (K key : keys)
-                vis.apply(key, null);
+            if (log.isDebugEnabled())
+                log.debug("Loaded values from store for keys: " + keys0);
         }
-
-        return false;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 8a3d0d1..0dbf7dd 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -23,6 +23,7 @@ import org.apache.ignite.lang.*;
 import org.apache.ignite.plugin.security.*;
 import org.apache.ignite.transactions.*;
 import org.gridgain.grid.cache.*;
+import org.gridgain.grid.kernal.*;
 import org.gridgain.grid.kernal.managers.communication.*;
 import org.gridgain.grid.kernal.processors.cache.*;
 import org.gridgain.grid.kernal.processors.cache.distributed.dht.*;
@@ -871,6 +872,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
             // Optimistically expect that all keys are available locally (avoid creation
of get future).
             for (K key : keys) {
+                if (key == null)
+                    return new GridFinishedFuture<>(ctx.kernalContext(), new IgniteNullArgumentCheckedException("Key
is null."));
+
                 GridCacheEntryEx<K, V> entry = null;
 
                 while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/738c67fd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java
new file mode 100644
index 0000000..5cbd2cb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/integration/IgniteCacheLoadAllAbstractTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.integration;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.processors.cache.*;
+
+import javax.cache.integration.*;
+import java.util.*;
+
+/**
+ * Test for {@link javax.cache.Cache#loadAll(Set, boolean, CompletionListener)}.
+ */
+public abstract class IgniteCacheLoadAllAbstractTest extends IgniteCacheAbstractTest {
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadAll() throws Exception {
+        IgniteCache<Integer, String> cache = jcache(0);
+
+        for (int i = 0; i < 1000; i++)
+            cache.put(i, String.valueOf(i));
+
+        stopAllGrids();
+
+        startGrids();
+
+        cache = jcache(0);
+
+        Set<Integer> keys = new HashSet<>();
+
+        for (int i = 0; i < 100; i++)
+            keys.add(i);
+
+        Set<Integer> nonExistKeys = new HashSet<>();
+
+        for (int i = 10_000; i < 10_010; i++)
+            nonExistKeys.add(i);
+
+        keys.addAll(nonExistKeys);
+
+        CompletionListener lsnr = new CompletionListenerFuture();
+
+        cache.loadAll(keys, false, lsnr);
+
+        for (int i = 0; i < gridCount(); i++) {
+
+        }
+    }
+}


Mime
View raw message