ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yzhda...@apache.org
Subject [03/16] incubator-ignite git commit: # ignite-60
Date Mon, 26 Jan 2015 15:54:20 GMT
# ignite-60


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/e3598a95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/e3598a95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/e3598a95

Branch: refs/heads/ignite-107
Commit: e3598a952057752a562f5c6a4ebf95778d2e5fa1
Parents: 7faa36f
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Jan 26 12:40:45 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Jan 26 12:40:45 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/IgniteCache.java     |  79 +++++++
 .../apache/ignite/cache/CacheProjection.java    |   6 +
 .../processors/cache/GridCacheAdapter.java      | 117 +++++++++-
 .../cache/GridCacheProjectionImpl.java          |   5 +
 .../processors/cache/GridCacheProxyImpl.java    |  12 +
 .../processors/cache/IgniteCacheProxy.java      | 229 +++++++++++--------
 .../dht/GridCacheGlobalLoadTest.java            |  78 ++++++-
 .../GridCachePartitionedLoadCacheSelfTest.java  |  25 +-
 8 files changed, 443 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
index d99ccc6..0655195 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java
@@ -101,6 +101,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K,
V>, IgniteAsyncS
      *      {@link CacheStore#loadCache(IgniteBiInClosure, Object...)} method.
      * @throws CacheException If loading failed.
      */
+    @IgniteAsyncSupported
     public void loadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object...
args) throws CacheException;
 
     /**
@@ -124,6 +125,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K,
V>, IgniteAsyncS
      *      {@link CacheStore#loadCache(IgniteBiInClosure, Object...)} method.
      * @throws CacheException If loading failed.
      */
+    @IgniteAsyncSupported
     public void localLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object...
args) throws CacheException;
 
     /**
@@ -155,6 +157,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K,
V>, IgniteAsyncS
      * @throws CacheException If put operation failed.
      * @throws org.apache.ignite.internal.processors.cache.CacheFlagException If projection
flags validation failed.
      */
+    @IgniteAsyncSupported
     @Nullable public V getAndPutIfAbsent(K key, V val) throws CacheException;
 
     /**
@@ -295,6 +298,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K,
V>, IgniteAsyncS
      * @param peekModes Optional peek modes. If not provided, then total cache size is returned.
      * @return Cache size across all nodes.
      */
+    @IgniteAsyncSupported
     public int size(CachePeekMode... peekModes) throws CacheException;
 
     /**
@@ -313,6 +317,7 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K,
V>, IgniteAsyncS
      * will be returned for {@link EntryProcessor}s that return a
      * <code>null</code> value for a key.
      */
+    @IgniteAsyncSupported
     <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ?
extends EntryProcessor<K, V, T>> map, Object... args);
 
     /**
@@ -353,4 +358,78 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K,
V>, IgniteAsyncS
      * @return Projection for portable objects.
      */
     public <K1, V1> IgniteCache<K1, V1> keepPortable();
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public V get(K key);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public Map<K, V> getAll(Set<? extends K> keys);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public boolean containsKey(K key);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public void put(K key, V val);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public V getAndPut(K key, V val);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public void putAll(Map<? extends K, ? extends V> map);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public boolean putIfAbsent(K key, V val);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public boolean remove(K key);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public boolean remove(K key, V oldVal);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public V getAndRemove(K key);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public boolean replace(K key, V oldVal, V newVal);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public boolean replace(K key, V val);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public V getAndReplace(K key, V val);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public void removeAll(Set<? extends K> keys);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public void removeAll();
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public void clear();
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor,
Object... arguments);
+
+    /** {@inheritDoc} */
+    @IgniteAsyncSupported
+    @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<?
extends K> keys,
+        EntryProcessor<K, V, T> entryProcessor,
+        Object... args);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
index 13f389d..4d61d1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheProjection.java
@@ -344,6 +344,12 @@ public interface CacheProjection<K, V> extends Iterable<CacheEntry<K,
V>> {
     public boolean containsKey(K key);
 
     /**
+     * @param key Key.
+     * @return Future.
+     */
+    public IgniteFuture<Boolean> containsKeyAsync(K key);
+
+    /**
      * Returns {@code true} if this cache contains given value.
      *
      * @param val Value to check.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3c0ed75..474441b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -639,6 +639,20 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> containsKeyAsync(K key) {
+        return containsKeyAsync(key, null);
+    }
+
+    /**
+     * @param key Key.
+     * @param filter Filter.
+     * @return Future.
+     */
+    public IgniteFuture<Boolean> containsKeyAsync(K key, @Nullable IgnitePredicate<CacheEntry<K,
V>> filter) {
+        return new GridFinishedFuture<>(ctx.kernalContext(), containsKey(key, filter));
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean containsValue(V val) {
         return containsValue(val, null);
     }
@@ -3575,6 +3589,37 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
         }
     }
 
+    /**
+     * @param p Predicate.
+     * @param args Arguments.
+     * @throws IgniteCheckedException If failed.
+     */
+    void globalLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args)
throws IgniteCheckedException {
+        ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name());
+
+        IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover();
+
+        comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args));
+    }
+
+    /**
+     * @param p Predicate.
+     * @param args Arguments.
+     * @throws IgniteCheckedException If failed.
+     */
+    IgniteFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p,
@Nullable Object... args)
+        throws IgniteCheckedException {
+        ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name());
+
+        IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover();
+
+        comp = comp.enableAsync();
+
+        comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args));
+
+        return comp.future();
+    }
+
     /** {@inheritDoc} */
     @Nullable @Override public CacheEntry<K, V> randomEntry() {
         GridCacheMapEntry<K, V> e = map.randomEntry();
@@ -5036,7 +5081,7 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
 
         /** {@inheritDoc} */
         @Override public Integer apply(Object o) {
-            GridCache<Object, Object> cache = ((GridEx) ignite).cachex(cacheName);
+            GridCache<Object, Object> cache = ((GridEx)ignite).cachex(cacheName);
 
             return primaryOnly ? cache.primarySize() : cache.size();
         }
@@ -5367,4 +5412,74 @@ public abstract class GridCacheAdapter<K, V> implements GridCache<K,
V>,
                 ldr.addData(col);
         }
     }
+
+    /**
+     *
+     */
+    private static class LoadCacheClosure<K, V> implements Callable<Void>, Externalizable
{
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** */
+        private String cacheName;
+
+        /** */
+        private IgniteBiPredicate<K, V> p;
+
+        /** */
+        private Object[] args;
+
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public LoadCacheClosure() {
+            // No-op.
+        }
+
+        /**
+         * @param cacheName Cache name.
+         * @param p Predicate.
+         * @param args Arguments.
+         */
+        private LoadCacheClosure(String cacheName, IgniteBiPredicate<K, V> p, Object[]
args) {
+            this.cacheName = cacheName;
+            this.p = p;
+            this.args = args;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void call() throws Exception {
+            IgniteCache<K, V> cache = ignite.jcache(cacheName);
+
+            assert cache != null : cacheName;
+
+            cache.localLoadCache(p, args);
+
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeObject(p);
+
+            out.writeObject(args);
+        }
+
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
+            p = (IgniteBiPredicate<K, V>)in.readObject();
+
+            args = (Object[])in.readObject();
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(LoadCacheClosure.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
index 6e7ce4c..b1e564a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProjectionImpl.java
@@ -613,6 +613,11 @@ public class GridCacheProjectionImpl<K, V> implements GridCacheProjectionEx<K,
V
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> containsKeyAsync(K key) {
+        return cache.containsKeyAsync(key, entryFilter(false));
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean containsValue(V val) {
         return cache.containsValue(val, entryFilter(true));
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 44bdc3f..504fe7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -340,6 +340,18 @@ public class GridCacheProxyImpl<K, V> implements GridCacheProxy<K,
V>, Externali
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteFuture<Boolean> containsKeyAsync(K key) {
+        GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
+
+        try {
+            return delegate.containsKeyAsync(key);
+        }
+        finally {
+            gate.leave(prev);
+        }
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean containsValue(V val) {
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index eacb5b3..ff40c5d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -127,11 +127,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                ClusterGroup nodes = ctx.kernalContext().grid().cluster().forCache(ctx.name());
-
-                IgniteCompute comp = ctx.kernalContext().grid().compute(nodes).withNoFailover();
-
-                comp.broadcast(new LoadCacheClosure<>(ctx.name(), p, args));
+                if (isAsync())
+                    curFut.set(ctx.cache().globalLoadCacheAsync(p, args));
+                else
+                    ctx.cache().globalLoadCache(p, args);
             }
             finally {
                 gate.leave(prev);
@@ -148,7 +147,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                delegate.<K, V>cache().loadCache(p, 0, args);
+                if (isAsync())
+                    curFut.set(delegate.<K, V>cache().loadCacheAsync(p, 0, args));
+                else
+                    delegate.<K, V>cache().loadCache(p, 0, args);
             }
             finally {
                 gate.leave(prev);
@@ -165,7 +167,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.putIfAbsent(key, val);
+                if (isAsync()) {
+                    curFut.set(delegate.putIfAbsentAsync(key, val));
+
+                    return null;
+                }
+                else
+                    return delegate.putIfAbsent(key, val);
             }
             finally {
                 gate.leave(prev);
@@ -282,7 +290,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return delegate.size();
+            return ctx.cache().globalSize();
+        }
+        catch (IgniteCheckedException e) {
+            throw cacheException(e);
         }
         finally {
             gate.leave(prev);
@@ -308,7 +319,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.get(key);
+                if (isAsync()) {
+                    curFut.set(delegate.getAsync(key));
+
+                    return null;
+                }
+                else
+                    return delegate.get(key);
             }
             finally {
                 gate.leave(prev);
@@ -325,7 +342,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.getAll(keys);
+                if (isAsync()) {
+                    curFut.set(delegate.getAllAsync(keys));
+
+                    return null;
+                }
+                else
+                    return delegate.getAll(keys);
             }
             finally {
                 gate.leave(prev);
@@ -345,7 +368,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.getAll(keys);
+                if (isAsync()) {
+                    curFut.set(delegate.getAllAsync(keys));
+
+                    return null;
+                }
+                else
+                    return delegate.getAll(keys);
             }
             finally {
                 gate.leave(prev);
@@ -395,7 +424,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
         GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
         try {
-            return delegate.containsKey(key);
+            if (isAsync()) {
+                curFut.set(delegate.containsKeyAsync(key));
+
+                return false;
+            }
+            else
+                return delegate.containsKey(key);
         }
         finally {
             gate.leave(prev);
@@ -439,7 +474,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                delegate.putx(key, val);
+                if (isAsync())
+                    curFut.set(delegate.putxAsync(key, val));
+                else
+                    delegate.putx(key, val);
             }
             finally {
                 gate.leave(prev);
@@ -456,7 +494,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.put(key, val);
+                if (isAsync()) {
+                    curFut.set(delegate.putAsync(key, val));
+
+                    return null;
+                }
+                else
+                    return delegate.put(key, val);
             }
             finally {
                 gate.leave(prev);
@@ -473,7 +517,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                delegate.putAll(map);
+                if (isAsync())
+                    curFut.set(delegate.putAllAsync(map));
+                else
+                    delegate.putAll(map);
             }
             finally {
                 gate.leave(prev);
@@ -490,7 +537,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.putxIfAbsent(key, val);
+                if (isAsync()) {
+                    curFut.set(delegate.putxIfAbsentAsync(key, val));
+
+                    return false;
+                }
+                else
+                    return delegate.putxIfAbsent(key, val);
             }
             finally {
                 gate.leave(prev);
@@ -507,7 +560,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.removex(key);
+                if (isAsync()) {
+                    curFut.set(delegate.removexAsync(key));
+
+                    return false;
+                }
+                else
+                    return delegate.removex(key);
             }
             finally {
                 gate.leave(prev);
@@ -524,7 +583,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.remove(key, oldVal);
+                if (isAsync()) {
+                    curFut.set(delegate.removeAsync(key, oldVal));
+
+                    return false;
+                }
+                else
+                    return delegate.remove(key, oldVal);
             }
             finally {
                 gate.leave(prev);
@@ -541,7 +606,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.remove(key);
+                if (isAsync()) {
+                    curFut.set(delegate.removeAsync(key));
+
+                    return null;
+                }
+                else
+                    return delegate.remove(key);
             }
             finally {
                 gate.leave(prev);
@@ -558,7 +629,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.replace(key, oldVal, newVal);
+                if (isAsync()) {
+                    curFut.set(delegate.replaceAsync(key, oldVal, newVal));
+
+                    return false;
+                }
+                else
+                    return delegate.replace(key, oldVal, newVal);
             }
             finally {
                 gate.leave(prev);
@@ -575,7 +652,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.replacex(key, val);
+                if (isAsync()) {
+                    curFut.set(delegate.replacexAsync(key, val));
+
+                    return false;
+                }
+                else
+                    return delegate.replacex(key, val);
             }
             finally {
                 gate.leave(prev);
@@ -592,7 +675,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return delegate.replace(key, val);
+                if (isAsync()) {
+                    curFut.set(delegate.replaceAsync(key, val));
+
+                    return null;
+                }
+                else
+                    return delegate.replace(key, val);
             }
             finally {
                 gate.leave(prev);
@@ -609,7 +698,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                delegate.removeAll(keys);
+                if (isAsync())
+                    curFut.set(delegate.removeAllAsync(keys));
+                else
+                    delegate.removeAll(keys);
             }
             finally {
                 gate.leave(prev);
@@ -628,7 +720,10 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                delegate.removeAll(keys);
+                if (isAsync())
+                    curFut.set(delegate.removeAllAsync(keys));
+                else
+                    delegate.removeAll(keys);
             }
             finally {
                 gate.leave(prev);
@@ -717,7 +812,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return saveOrGet(delegate.invokeAllAsync(keys, entryProcessor, args));
+                if (isAsync()) {
+                    curFut.set(delegate.invokeAllAsync(keys, entryProcessor, args));
+
+                    return null;
+                }
+                else
+                    return delegate.invokeAll(keys, entryProcessor, args);
             }
             finally {
                 gate.leave(prev);
@@ -736,7 +837,13 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
             GridCacheProjectionImpl<K, V> prev = gate.enter(prj);
 
             try {
-                return saveOrGet(delegate.invokeAllAsync(map, args));
+                if (isAsync()) {
+                    curFut.set(delegate.invokeAllAsync(map, args));
+
+                    return null;
+                }
+                else
+                    return delegate.invokeAll(map, args);
             }
             finally {
                 gate.leave(prev);
@@ -976,74 +1083,4 @@ public class IgniteCacheProxy<K, V> extends IgniteAsyncSupportAdapter
implements
     @Override public String toString() {
         return S.toString(IgniteCacheProxy.class, this);
     }
-
-    /**
-     *
-     */
-    private static class LoadCacheClosure<K, V> implements Callable<Void>, Externalizable
{
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private String cacheName;
-
-        /** */
-        private IgniteBiPredicate<K, V> p;
-
-        /** */
-        private Object[] args;
-
-        /** */
-        @IgniteInstanceResource
-        private Ignite ignite;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public LoadCacheClosure() {
-            // No-op.
-        }
-
-        /**
-         * @param cacheName Cache name.
-         * @param p Predicate.
-         * @param args Arguments.
-         */
-        private LoadCacheClosure(String cacheName, IgniteBiPredicate<K, V> p, Object[]
args) {
-            this.cacheName = cacheName;
-            this.p = p;
-            this.args = args;
-        }
-
-        /** {@inheritDoc} */
-        @Override public Void call() throws Exception {
-            IgniteCache<K, V> cache = ignite.jcache(cacheName);
-
-            assert cache != null : cacheName;
-
-            cache.localLoadCache(p, args);
-
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeObject(p);
-
-            out.writeObject(args);
-        }
-
-        /** {@inheritDoc} */
-        @SuppressWarnings("unchecked")
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException
{
-            p = (IgniteBiPredicate<K, V>)in.readObject();
-
-            args = (Object[])in.readObject();
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(LoadCacheClosure.class, this);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGlobalLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGlobalLoadTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGlobalLoadTest.java
index 01eb3e7..aabcc3a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGlobalLoadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheGlobalLoadTest.java
@@ -41,6 +41,9 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest {
     /** */
     private static ConcurrentMap<String, Object[]> map;
 
+    /** */
+    private static volatile boolean failStore;
+
     /** {@inheritDoc} */
     @Override protected int gridCount() {
         return 3;
@@ -65,11 +68,36 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest {
      * @throws Exception If failed.
      */
     public void testLoadCache() throws Exception {
+        loadCache(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLoadCacheAsync() throws Exception {
+        loadCache(true);
+    }
+
+    /**
+     * @param async If {@code true} uses asynchronous method.
+     * @throws Exception If failed.
+     */
+    private void loadCache(boolean async) throws Exception {
         IgniteCache<Integer, Integer> cache = jcache();
 
+        IgniteCache<Integer, Integer> asyncCache = cache.enableAsync();
+
+        assertTrue(asyncCache.isAsync());
+
         map = new ConcurrentHashMap8<>();
 
-        cache.loadCache(null, 1, 2, 3);
+        if (async) {
+            asyncCache.loadCache(null, 1, 2, 3);
+
+            asyncCache.future().get();
+        }
+        else
+            cache.loadCache(null, 1, 2, 3);
 
         assertEquals(3, map.size());
 
@@ -87,14 +115,28 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest
{
 
         map = new ConcurrentHashMap8<>();
 
-        cache.loadCache(new IgniteBiPredicate<Integer, Integer>() {
-            @Override public boolean apply(Integer key, Integer val) {
-                assertNotNull(key);
-                assertNotNull(val);
+        if (async) {
+            asyncCache.loadCache(new IgniteBiPredicate<Integer, Integer>() {
+                @Override public boolean apply(Integer key, Integer val) {
+                    assertNotNull(key);
+                    assertNotNull(val);
 
-                return key % 2 == 0;
-            }
-        }, 1, 2, 3, 4, 5, 6);
+                    return key % 2 == 0;
+                }
+            }, 1, 2, 3, 4, 5, 6);
+
+            asyncCache.future().get();
+        }
+        else {
+            cache.loadCache(new IgniteBiPredicate<Integer, Integer>() {
+                @Override public boolean apply(Integer key, Integer val) {
+                    assertNotNull(key);
+                    assertNotNull(val);
+
+                    return key % 2 == 0;
+                }
+            }, 1, 2, 3, 4, 5, 6);
+        }
 
         assertEquals(3, map.size());
 
@@ -115,10 +157,24 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest
{
     }
 
     /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        failStore = true;
+    }
+
+    /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
         map = null;
+
+        failStore = false;
+
+        IgniteCache<Integer, Integer> cache = jcache();
+
+        for (int i = 0; i < 7; i++)
+            cache.remove(i);
     }
 
     /** {@inheritDoc} */
@@ -153,7 +209,8 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest {
 
         /** {@inheritDoc} */
         @Override public Integer load(Integer key) {
-            assertEquals((Integer)5, key);
+            if (failStore)
+                assertEquals((Integer)5, key);
 
             return null;
         }
@@ -165,7 +222,8 @@ public class GridCacheGlobalLoadTest extends IgniteCacheAbstractTest {
 
         /** {@inheritDoc} */
         @Override public void delete(Object key) {
-            fail();
+            if (failStore)
+                fail();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/e3598a95/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedLoadCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedLoadCacheSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedLoadCacheSelfTest.java
index bf5271b..79ab0b8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedLoadCacheSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedLoadCacheSelfTest.java
@@ -79,12 +79,35 @@ public class GridCachePartitionedLoadCacheSelfTest extends GridCommonAbstractTes
      * @throws Exception If failed.
      */
     public void testLocalLoadCache() throws Exception {
+        loadCache(false);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testLocalLoadCacheAsync() throws Exception {
+        loadCache(true);
+    }
+
+    /**
+     * @param async If {@code true} uses asynchronous load.
+     * @throws Exception If failed.
+     */
+    private void loadCache(boolean async) throws Exception {
         try {
             startGridsMultiThreaded(GRID_CNT);
 
             IgniteCache<Integer, String> cache = jcache(0);
 
-            cache.localLoadCache(null, PUT_CNT);
+            if (async) {
+                IgniteCache<Integer, String> asyncCache = cache.enableAsync();
+
+                asyncCache.localLoadCache(null, PUT_CNT);
+
+                asyncCache.future().get();
+            }
+            else
+                cache.localLoadCache(null, PUT_CNT);
 
             GridCache<Integer, String> cache0 = cache(0);
 


Mime
View raw message