ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [04/53] [abbrv] incubator-ignite git commit: IGNITE-49 merged from sprint-1 into ignite-49
Date Tue, 27 Jan 2015 16:40:24 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/c7a9a1db/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index 0000000,6109196..0a03e5c
mode 000000,100644..100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@@ -1,0 -1,1517 +1,1535 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.local.atomic;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.internal.processors.cache.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.plugin.security.*;
+ import org.apache.ignite.transactions.*;
+ import org.apache.ignite.internal.processors.cache.local.*;
+ import org.apache.ignite.internal.processors.cache.transactions.*;
+ import org.apache.ignite.internal.util.future.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jetbrains.annotations.*;
+ import sun.misc.*;
+ 
+ import javax.cache.expiry.*;
+ import javax.cache.processor.*;
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ 
+ import static org.apache.ignite.cache.CacheFlag.*;
+ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
+ 
+ /**
+  * Non-transactional local cache.
+  */
+ public class GridLocalAtomicCache<K, V> extends GridCacheAdapter<K, V> {
+     /** */
+     private static final long serialVersionUID = 0L;
+ 
+     /** Unsafe instance. */
+     private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+ 
+     /** */
+     private GridCachePreloader<K,V> preldr;
+ 
+     /**
+      * Empty constructor required by {@link Externalizable}.
+      */
+     public GridLocalAtomicCache() {
+         // No-op.
+     }
+ 
+     /**
+      * @param ctx Cache context.
+      */
+     public GridLocalAtomicCache(GridCacheContext<K, V> ctx) {
+         super(ctx, ctx.config().getStartSize());
+ 
+         preldr = new GridCachePreloaderAdapter<>(ctx);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void init() {
+         map.setEntryFactory(new GridCacheMapEntryFactory<K, V>() {
+             @Override public GridCacheMapEntry<K, V> create(GridCacheContext<K, V> ctx, long topVer, K key, int hash,
+                 V val, @Nullable GridCacheMapEntry<K, V> next, long ttl, int hdrId) {
+                 return new GridLocalCacheEntry<K, V>(ctx, key, hash, val, next, ttl, hdrId) {
+                     @Override public CacheEntry<K, V> wrapFilterLocked() throws IgniteCheckedException {
+                         assert Thread.holdsLock(this);
+ 
+                         return new GridCacheFilterEvaluationEntry<>(key, rawGetOrUnmarshalUnlocked(true), this);
+                     }
+                 };
+             }
+         });
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void start() throws IgniteCheckedException {
+         // No-op.
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean isLocal() {
+         return true;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public GridCachePreloader<K, V> preloader() {
+         return preldr;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public V put(K key,
+         V val,
+         @Nullable GridCacheEntryEx<K, V> cached,
+         long ttl,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+         A.notNull(key, "key", val, "val");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return (V)updateAllInternal(UPDATE,
+             Collections.singleton(key),
+             Collections.singleton(val),
+             null,
+             expiryPerCall(),
+             true,
+             false,
+             filter,
+             ctx.writeThrough());
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public boolean putx(K key,
+         V val,
+         @Nullable GridCacheEntryEx<K, V> cached,
+         long ttl,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException {
+         A.notNull(key, "key", val, "val");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return (Boolean)updateAllInternal(UPDATE,
+             Collections.singleton(key),
+             Collections.singleton(val),
+             null,
+             expiryPerCall(),
+             false,
+             false,
+             filter,
+             ctx.writeThrough());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean putx(K key,
+         V val,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+         A.notNull(key, "key", val, "val");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return (Boolean)updateAllInternal(UPDATE,
+             Collections.singleton(key),
+             Collections.singleton(val),
+             null,
+             expiryPerCall(),
+             false,
+             false,
+             filter,
+             ctx.writeThrough());
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<V> putAsync(K key,
+         V val,
+         @Nullable GridCacheEntryEx<K, V> entry,
+         long ttl,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         A.notNull(key, "key", val, "val");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return updateAllAsync0(F0.asMap(key, val),
+             null,
+             null,
+             true,
+             false,
+             filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<Boolean> putxAsync(K key,
+         V val,
+         @Nullable GridCacheEntryEx<K, V> entry,
+         long ttl,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         A.notNull(key, "key", val, "val");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return updateAllAsync0(F0.asMap(key, val),
+             null,
+             null,
+             false,
+             false,
+             filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public V putIfAbsent(K key, V val) throws IgniteCheckedException {
+         return put(key, val, ctx.noPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<V> putIfAbsentAsync(K key, V val) {
+         return putAsync(key, val, ctx.noPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean putxIfAbsent(K key, V val) throws IgniteCheckedException {
+         return putx(key, val, ctx.noPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Boolean> putxIfAbsentAsync(K key, V val) {
+         return putxAsync(key, val, ctx.noPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public V replace(K key, V val) throws IgniteCheckedException {
+         return put(key, val, ctx.hasPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<V> replaceAsync(K key, V val) {
+         return putAsync(key, val, ctx.hasPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean replacex(K key, V val) throws IgniteCheckedException {
+         return putx(key, val, ctx.hasPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Boolean> replacexAsync(K key, V val) {
+         return putxAsync(key, val, ctx.hasPeekArray());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean replace(K key, V oldVal, V newVal) throws IgniteCheckedException {
+         return putx(key, newVal, ctx.equalsPeekArray(oldVal));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
+         return putxAsync(key, newVal, ctx.equalsPeekArray(oldVal));
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public GridCacheReturn<V> replacex(K key, V oldVal, V newVal) throws IgniteCheckedException {
+         A.notNull(key, "key");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return (GridCacheReturn<V>)updateAllInternal(UPDATE,
+             Collections.singleton(key),
+             Collections.singleton(newVal),
+             null,
+             expiryPerCall(),
+             true,
+             true,
+             ctx.equalsPeekArray(oldVal),
+             ctx.writeThrough());
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public GridCacheReturn<V> removex(K key, V val) throws IgniteCheckedException {
+         A.notNull(key, "key");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return (GridCacheReturn<V>)updateAllInternal(DELETE,
+             Collections.singleton(key),
+             null,
+             null,
+             expiryPerCall(),
+             true,
+             true,
+             ctx.equalsPeekArray(val),
+             ctx.writeThrough());
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<GridCacheReturn<V>> removexAsync(K key, V val) {
+         A.notNull(key, "key");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return removeAllAsync0(F.asList(key), true, true, ctx.equalsPeekArray(val));
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<GridCacheReturn<V>> replacexAsync(K key, V oldVal, V newVal) {
+         A.notNull(key, "key");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return updateAllAsync0(F.asMap(key, newVal),
+             null,
+             null,
+             true,
+             true,
+             ctx.equalsPeekArray(oldVal));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void putAll(Map<? extends K, ? extends V> m,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+         ctx.denyOnLocalRead();
+ 
+         updateAllInternal(UPDATE,
+             m.keySet(),
+             m.values(),
+             null,
+             expiryPerCall(),
+             false,
+             false,
+             filter,
+             ctx.writeThrough());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> putAllAsync(Map<? extends K, ? extends V> m,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         ctx.denyOnLocalRead();
+ 
+         return updateAllAsync0(m,
+             null,
+             null,
+             false,
+             false,
+             filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public V remove(K key,
+         @Nullable GridCacheEntryEx<K, V> entry,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException {
+         ctx.denyOnLocalRead();
+ 
+         return (V)updateAllInternal(DELETE,
+             Collections.singleton(key),
+             null,
+             null,
+             expiryPerCall(),
+             true,
+             false,
+             filter,
+             ctx.writeThrough());
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<V> removeAsync(K key,
+         @Nullable GridCacheEntryEx<K, V> entry,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         ctx.denyOnLocalRead();
+ 
+         return removeAllAsync0(Collections.singletonList(key), true, false, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public void removeAll(Collection<? extends K> keys,
+         IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException {
+         ctx.denyOnLocalRead();
+ 
+         updateAllInternal(DELETE,
+             keys,
+             null,
+             null,
+             expiryPerCall(),
+             false,
+             false,
+             filter,
+             ctx.writeThrough());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> removeAllAsync(Collection<? extends K> keys,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         ctx.denyOnLocalRead();
+ 
+         return removeAllAsync0(keys, false, false, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public boolean removex(K key,
+         @Nullable GridCacheEntryEx<K, V> entry,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException {
++        boolean statsEnabled = ctx.config().isStatisticsEnabled();
++
++        long start = statsEnabled ? System.nanoTime() : 0L;
++
+         A.notNull(key, "key");
+ 
+         ctx.denyOnLocalRead();
+ 
 -        return (Boolean)updateAllInternal(DELETE,
++        Boolean removed = (Boolean)updateAllInternal(DELETE,
+             Collections.singleton(key),
+             null,
+             null,
+             expiryPerCall(),
+             false,
+             false,
+             filter,
+             ctx.writeThrough());
++
++        if (statsEnabled && removed)
++            metrics0().addRemoveTimeNanos(System.nanoTime() - start);
++
++        return  removed;
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<Boolean> removexAsync(K key,
+         @Nullable GridCacheEntryEx<K, V> entry,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         A.notNull(key, "key");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return removeAllAsync0(Collections.singletonList(key), false, false, filter);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean remove(K key, V val) throws IgniteCheckedException {
+         A.notNull(key, "key");
+ 
+         ctx.denyOnLocalRead();
+ 
+         return (Boolean)updateAllInternal(DELETE,
+             Collections.singleton(key),
+             null,
+             null,
+             expiryPerCall(),
+             false,
+             false,
+             ctx.equalsPeekArray(val),
+             ctx.writeThrough());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Boolean> removeAsync(K key, V val) {
+         return removexAsync(key, ctx.equalsPeekArray(val));
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public void removeAll(IgnitePredicate<CacheEntry<K, V>>[] filter) throws IgniteCheckedException {
+         removeAll(keySet(filter));
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<?> removeAllAsync(IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         return removeAllAsync(keySet(filter), filter);
+     }
+ 
+     /** {@inheritDoc} */
+ 
+     @SuppressWarnings("unchecked")
+     @Override @Nullable public V get(K key, boolean deserializePortable,
+         @Nullable IgnitePredicate<CacheEntry<K, V>> filter) throws IgniteCheckedException {
+         ctx.denyOnFlag(LOCAL);
+ 
+         String taskName = ctx.kernalContext().job().currentTaskName();
+ 
+         Map<K, V> m = getAllInternal(Collections.singleton(key),
+             filter != null ? new IgnitePredicate[]{filter} : null,
+             ctx.isSwapOrOffheapEnabled(),
+             ctx.readThrough(),
+             ctx.hasFlag(CLONE),
+             taskName,
+             deserializePortable);
+ 
+         return m.get(key);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public final Map<K, V> getAll(Collection<? extends K> keys, boolean deserializePortable,
+         IgnitePredicate<CacheEntry<K, V>> filter)
+         throws IgniteCheckedException {
+         ctx.denyOnFlag(LOCAL);
+ 
+         String taskName = ctx.kernalContext().job().currentTaskName();
+ 
+         return getAllInternal(keys,
+             filter != null ? new IgnitePredicate[]{filter} : null,
+             ctx.isSwapOrOffheapEnabled(),
+             ctx.readThrough(),
+             ctx.hasFlag(CLONE),
+             taskName,
+             deserializePortable);
+     }
+ 
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<Map<K, V>> getAllAsync(
+         @Nullable final Collection<? extends K> keys,
+         final boolean forcePrimary,
+         boolean skipTx,
+         @Nullable final GridCacheEntryEx<K, V> entry,
+         @Nullable UUID subjId,
+         final String taskName,
+         final boolean deserializePortable,
+         @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter
+     ) {
+         ctx.denyOnFlag(LOCAL);
+ 
+         final boolean swapOrOffheap = ctx.isSwapOrOffheapEnabled();
+         final boolean storeEnabled = ctx.readThrough();
+         final boolean clone = ctx.hasFlag(CLONE);
+ 
+         return asyncOp(new Callable<Map<K, V>>() {
+             @Override public Map<K, V> call() throws Exception {
+                 return getAllInternal(keys, filter, swapOrOffheap, storeEnabled, clone, taskName, deserializePortable);
+             }
+         });
+     }
+ 
+     /**
+      * Entry point to all public API get methods.
+      *
+      * @param keys Keys to remove.
+      * @param filter Filter.
+      * @param swapOrOffheap {@code True} if swap of off-heap storage are enabled.
+      * @param storeEnabled Store enabled flag.
+      * @param clone {@code True} if returned values should be cloned.
+      * @param taskName Task name.
+      * @param deserializePortable Deserialize portable .
+      * @return Key-value map.
+      * @throws IgniteCheckedException If failed.
+      */
+     @SuppressWarnings("ConstantConditions")
+     private Map<K, V> getAllInternal(@Nullable Collection<? extends K> keys,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter,
+         boolean swapOrOffheap,
+         boolean storeEnabled,
+         boolean clone,
+         String taskName,
+         boolean deserializePortable) throws IgniteCheckedException {
+         ctx.checkSecurity(GridSecurityPermission.CACHE_READ);
+ 
+         if (F.isEmpty(keys))
+             return Collections.emptyMap();
+ 
+         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+ 
+         UUID subjId = ctx.subjectIdPerCall(null, prj);
+ 
+         ExpiryPolicy expiryPlc = prj != null ? prj.expiry() : null;
+ 
+         if (expiryPlc == null)
+             expiryPlc = ctx.expiry();
+ 
+         Map<K, V> vals = new HashMap<>(keys.size(), 1.0f);
+ 
+         if (keyCheck)
+             validateCacheKeys(keys);
+ 
+         final GetExpiryPolicy expiry = accessExpiryPolicy(expiryPlc);
+ 
+         boolean success = true;
+ 
+         for (K key : keys) {
+             if (key == null)
+                 continue;
+ 
+             GridCacheEntryEx<K, V> entry = null;
+ 
+             while (true) {
+                 try {
+                     entry = swapOrOffheap ? entryEx(key) : peekEx(key);
+ 
+                     if (entry != null) {
+                         V v = entry.innerGet(null,
+                             /*swap*/swapOrOffheap,
+                             /*read-through*/false,
+                             /*fail-fast*/false,
+                             /*unmarshal*/true,
+                             /**update-metrics*/true,
+                             /**event*/true,
+                             /**temporary*/false,
+                             subjId,
+                             null,
+                             taskName,
+                             filter,
+                             expiry);
+ 
+                         if (v != null)
+                             vals.put(key, v);
+                         else
+                             success = false;
+                     }
+                     else {
 -                        if (!storeEnabled)
++                        if (!storeEnabled && configuration().isStatisticsEnabled())
+                             metrics0().onRead(false);
+ 
+                         success = false;
+                     }
+ 
+                     break; // While.
+                 }
+                 catch (GridCacheEntryRemovedException ignored) {
+                     // No-op, retry.
+                 }
+                 catch (GridCacheFilterFailedException ignored) {
+                     // No-op, skip the key.
+                     break;
+                 }
+                 finally {
+                     if (entry != null)
+                         ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+                 }
+ 
+                 if (!success && storeEnabled)
+                     break;
+             }
+         }
+ 
+         if (success || !storeEnabled) {
+             if (!clone)
+                 return vals;
+ 
+             Map<K, V> map = new GridLeanMap<>();
+ 
+             for (Map.Entry<K, V> e : vals.entrySet())
+                 map.put(e.getKey(), ctx.cloneValue(e.getValue()));
+ 
+             return map;
+         }
+ 
+         return getAllAsync(keys, true, null, false, subjId, taskName, deserializePortable, false, expiry, filter).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <T> EntryProcessorResult<T> invoke(K key,
+         EntryProcessor<K, V, T> entryProcessor,
+         Object... args) throws IgniteCheckedException {
+         return invokeAsync(key, entryProcessor, args).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys,
+         EntryProcessor<K, V, T> entryProcessor,
+         Object... args) throws IgniteCheckedException {
+         return invokeAllAsync(keys, entryProcessor, args).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public <T> IgniteFuture<EntryProcessorResult<T>> invokeAsync(K key,
+         EntryProcessor<K, V, T> entryProcessor,
+         Object... args) throws EntryProcessorException {
+         A.notNull(key, "key", entryProcessor, "entryProcessor");
+ 
+         if (keyCheck)
+             validateCacheKey(key);
+ 
+         ctx.denyOnLocalRead();
+ 
+         Map<? extends K, EntryProcessor> invokeMap =
+             Collections.singletonMap(key, (EntryProcessor)entryProcessor);
+ 
+         IgniteFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null,
+             invokeMap,
+             args,
+             true,
+             false,
+             null);
+ 
+         return fut.chain(new CX1<IgniteFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() {
+             @Override public EntryProcessorResult<T> applyx(IgniteFuture<Map<K, EntryProcessorResult<T>>> fut)
+                 throws IgniteCheckedException {
+                 Map<K, EntryProcessorResult<T>> resMap = fut.get();
+ 
+                 if (resMap != null) {
+                     assert resMap.isEmpty() || resMap.size() == 1 : resMap.size();
+ 
+                     return resMap.isEmpty() ? null : resMap.values().iterator().next();
+                 }
+ 
+                 return null;
+             }
+         });
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
+         Set<? extends K> keys,
+         final EntryProcessor<K, V, T> entryProcessor,
+         Object... args) {
+         A.notNull(keys, "keys", entryProcessor, "entryProcessor");
+ 
+         if (keyCheck)
+             validateCacheKeys(keys);
+ 
+         ctx.denyOnLocalRead();
+ 
+         Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() {
+             @Override public EntryProcessor apply(K k) {
+                 return entryProcessor;
+             }
+         });
+ 
+         return updateAllAsync0(null,
+             invokeMap,
+             args,
+             true,
+             false,
+             null);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(
+         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
+         Object... args) throws IgniteCheckedException {
+         return invokeAllAsync(map, args).get();
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public <T> IgniteFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(
+         Map<? extends K, ? extends EntryProcessor<K, V, T>> map,
+         Object... args) {
+         A.notNull(map, "map");
+ 
+         if (keyCheck)
+             validateCacheKeys(map.keySet());
+ 
+         ctx.denyOnLocalRead();
+ 
+         return updateAllAsync0(null,
+             map,
+             args,
+             true,
+             false,
+             null);
+     }
+ 
+     /**
+      * Entry point for public API update methods.
+      *
+      * @param map Put map. Either {@code map} or {@code invokeMap} should be passed.
+      * @param invokeMap Transform map. Either {@code map} or {@code invokeMap} should be passed.
+      * @param invokeArgs Optional arguments for EntryProcessor.
+      * @param retval Return value required flag.
+      * @param rawRetval Return {@code GridCacheReturn} instance.
+      * @param filter Cache entry filter for atomic updates.
+      * @return Completion future.
+      */
+     private IgniteFuture updateAllAsync0(
+         @Nullable final Map<? extends K, ? extends V> map,
+         @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap,
+         @Nullable final Object[] invokeArgs,
+         final boolean retval,
+         final boolean rawRetval,
+         @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter
+     ) {
+         final GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE;
+ 
+         final Collection<? extends K> keys =
+             map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : null;
+ 
+         final Collection<?> vals = map != null ? map.values() : invokeMap != null ? invokeMap.values() : null;
+ 
+         final boolean writeThrough = ctx.writeThrough();
+ 
+         final ExpiryPolicy expiry = expiryPerCall();
+ 
+         return asyncOp(new Callable<Object>() {
+             @Override public Object call() throws Exception {
+                 return updateAllInternal(op,
+                     keys,
+                     vals,
+                     invokeArgs,
+                     expiry,
+                     retval,
+                     rawRetval,
+                     filter,
+                     writeThrough);
+             }
+         });
+     }
+ 
+     /**
+      * Entry point for public API remove methods.
+      *
+      * @param keys Keys to remove.
+      * @param retval Return value required flag.
+      * @param rawRetval Return {@code GridCacheReturn} instance.
+      * @param filter Cache entry filter.
+      * @return Completion future.
+      */
+     private IgniteFuture removeAllAsync0(
+         @Nullable final Collection<? extends K> keys,
+         final boolean retval,
+         final boolean rawRetval,
+         @Nullable final IgnitePredicate<CacheEntry<K, V>>[] filter
+     ) {
+         final boolean writeThrough = ctx.writeThrough();
+ 
++        final boolean statsEnabled = ctx.config().isStatisticsEnabled();
++
++        final long start = statsEnabled ? System.nanoTime() : 0L;
++
+         final ExpiryPolicy expiryPlc = expiryPerCall();
+ 
 -        return asyncOp(new Callable<Object>() {
++        IgniteFuture fut = asyncOp(new Callable<Object>() {
+             @Override public Object call() throws Exception {
+                 return updateAllInternal(DELETE,
+                     keys,
+                     null,
+                     null,
+                     expiryPlc,
+                     retval,
+                     rawRetval,
+                     filter,
+                     writeThrough);
+             }
+         });
++
++        if (statsEnabled)
++            fut.listenAsync(new UpdateRemoveTimeStatClosure<>(metrics0(), start));
++
++        return fut;
+     }
+ 
+     /**
+      * Entry point for all public update methods (put, remove, invoke).
+      *
+      * @param op Operation.
+      * @param keys Keys.
+      * @param vals Values.
+      * @param invokeArgs Optional arguments for EntryProcessor.
+      * @param expiryPlc Expiry policy.
+      * @param retval Return value required flag.
+      * @param rawRetval Return {@code GridCacheReturn} instance.
+      * @param filter Cache entry filter.
+      * @param storeEnabled Store enabled flag.
+      * @return Update result.
+      * @throws IgniteCheckedException If failed.
+      */
+     @SuppressWarnings("unchecked")
+     private Object updateAllInternal(GridCacheOperation op,
+         Collection<? extends K> keys,
+         @Nullable Iterable<?> vals,
+         @Nullable Object[] invokeArgs,
+         @Nullable ExpiryPolicy expiryPlc,
+         boolean retval,
+         boolean rawRetval,
+         IgnitePredicate<CacheEntry<K, V>>[] filter,
+         boolean storeEnabled) throws IgniteCheckedException {
+         if (keyCheck)
+             validateCacheKeys(keys);
+ 
+         if (op == DELETE)
+             ctx.checkSecurity(GridSecurityPermission.CACHE_REMOVE);
+         else
+             ctx.checkSecurity(GridSecurityPermission.CACHE_PUT);
+ 
+         String taskName = ctx.kernalContext().job().currentTaskName();
+ 
+         GridCacheVersion ver = ctx.versions().next();
+ 
+         UUID subjId = ctx.subjectIdPerCall(null);
+ 
+         if (storeEnabled && keys.size() > 1) {
+             return updateWithBatch(op,
+                 keys,
+                 vals,
+                 invokeArgs,
+                 expiryPlc,
+                 ver,
+                 filter,
+                 subjId,
+                 taskName);
+         }
+ 
+         Iterator<?> valsIter = vals != null ? vals.iterator() : null;
+ 
+         IgniteBiTuple<Boolean, ?> res = null;
+ 
+         GridCachePartialUpdateException err = null;
+ 
+         boolean intercept = ctx.config().getInterceptor() != null;
+ 
+         for (K key : keys) {
+             Object val = valsIter != null ? valsIter.next() : null;
+ 
+             if (key == null)
+                 continue;
+ 
+             while (true) {
+                 GridCacheEntryEx<K, V> entry = null;
+ 
+                 try {
+                     entry = entryEx(key);
+ 
+                     GridTuple3<Boolean, V, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
+                         ver,
+                         val == null ? DELETE : op,
+                         val,
+                         invokeArgs,
+                         storeEnabled,
+                         retval,
+                         expiryPlc,
+                         true,
+                         true,
+                         filter,
+                         intercept,
+                         subjId,
+                         taskName);
+ 
+                     if (op == TRANSFORM) {
+                         if (t.get3() != null) {
+                             Map<K, EntryProcessorResult> computedMap;
+ 
+                             if (res == null) {
+                                 computedMap = U.newHashMap(keys.size());
+ 
+                                 res = new IgniteBiTuple<>(true, computedMap);
+                             }
+                             else
+                                 computedMap = (Map<K, EntryProcessorResult>)res.get2();
+ 
+                             computedMap.put(key, t.get3());
+                         }
+                     }
+                     else if (res == null)
+                         res = new T2(t.get1(), t.get2());
+ 
+                     break; // While.
+                 }
+                 catch (GridCacheEntryRemovedException ignored) {
+                     if (log.isDebugEnabled())
+                         log.debug("Got removed entry while updating (will retry): " + key);
+ 
+                     entry = null;
+                 }
+                 catch (IgniteCheckedException e) {
+                     if (err == null)
+                         err = partialUpdateException();
+ 
+                     err.add(F.asList(key), e);
+ 
+                     U.error(log, "Failed to update key : " + key, e);
+ 
+                     break;
+                 }
+                 finally {
+                     if (entry != null)
+                         ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion());
+                 }
+             }
+         }
+ 
+         if (err != null)
+             throw err;
+ 
+         Object ret = res == null ? null : rawRetval ?
+             new GridCacheReturn<>(res.get2(), res.get1()) : retval ? res.get2() : res.get1();
+ 
+         if (op == TRANSFORM && ret == null)
+             ret = Collections.emptyMap();
+ 
+         return ret;
+     }
+ 
+     /**
+      * Updates entries using batched write-through.
+      *
+      * @param op Operation.
+      * @param keys Keys.
+      * @param vals Values.
+      * @param invokeArgs Optional arguments for EntryProcessor.
+      * @param expiryPlc Expiry policy.
+      * @param ver Cache version.
+      * @param filter Optional filter.
+      * @param subjId Subject ID.
+      * @param taskName Task name.
+      * @throws GridCachePartialUpdateException If update failed.
+      * @return Results map for invoke operation.
+      */
+     @SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"})
+     private Map<K, EntryProcessorResult> updateWithBatch(
+         GridCacheOperation op,
+         Collection<? extends K> keys,
+         @Nullable Iterable<?> vals,
+         @Nullable Object[] invokeArgs,
+         @Nullable ExpiryPolicy expiryPlc,
+         GridCacheVersion ver,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>[] filter,
+         UUID subjId,
+         String taskName
+     ) throws IgniteCheckedException {
+         List<GridCacheEntryEx<K, V>> locked = lockEntries(keys);
+ 
+         try {
+             int size = locked.size();
+ 
+             Map<K, V> putMap = null;
+ 
+             Collection<K> rmvKeys = null;
+ 
+             Map<K, EntryProcessorResult> invokeResMap =
+                 op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null;
+ 
+             List<GridCacheEntryEx<K, V>> filtered = new ArrayList<>(size);
+ 
+             GridCachePartialUpdateException err = null;
+ 
+             Iterator<?> valsIter = vals != null ? vals.iterator() : null;
+ 
+             boolean intercept = ctx.config().getInterceptor() != null;
+ 
+             for (int i = 0; i < size; i++) {
+                 GridCacheEntryEx<K, V> entry = locked.get(i);
+ 
+                 Object val = valsIter != null ? valsIter.next() : null;
+ 
+                 if (val == null && op != DELETE)
+                     continue;
+ 
+                 try {
+                     try {
+                         if (!ctx.isAll(entry.wrapFilterLocked(), filter)) {
+                             if (log.isDebugEnabled())
+                                 log.debug("Entry did not pass the filter (will skip write) [entry=" + entry +
+                                     ", filter=" + Arrays.toString(filter) + ']');
+ 
+                             continue;
+                         }
+                     }
+                     catch (IgniteCheckedException e) {
+                         if (err == null)
+                             err = partialUpdateException();
+ 
+                         err.add(F.asList(entry.key()), e);
+ 
+                         continue;
+                     }
+ 
+                     if (op == TRANSFORM) {
+                         EntryProcessor<K, V, ?> entryProcessor = (EntryProcessor<K, V, ?>)val;
+ 
+                         V old = entry.innerGet(null,
+                             /*swap*/true,
+                             /*read-through*/true,
+                             /*fail-fast*/false,
+                             /*unmarshal*/true,
+                             /**update-metrics*/true,
+                             /**event*/true,
+                             /**temporary*/true,
+                             subjId,
+                             entryProcessor,
+                             taskName,
+                             CU.<K, V>empty(),
+                             null);
+ 
+                         CacheInvokeEntry<K, V> invokeEntry = new CacheInvokeEntry<>(entry.key(), old);
+ 
+                         V updated;
+                         CacheInvokeResult invokeRes = null;
+ 
+                         try {
+                             Object computed = entryProcessor.process(invokeEntry, invokeArgs);
+ 
+                             updated = ctx.unwrapTemporary(invokeEntry.getValue());
+ 
+                             if (computed != null)
+                                 invokeRes = new CacheInvokeResult<>(ctx.unwrapTemporary(computed));
+                         }
+                         catch (Exception e) {
+                             invokeRes = new CacheInvokeResult<>(e);
+ 
+                             updated = old;
+                         }
+ 
+                         if (invokeRes != null)
+                             invokeResMap.put(entry.key(), invokeRes);
+ 
+                         if (updated == null) {
+                             if (intercept) {
+                                 IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(
+                                     entry.key(), old);
+ 
+                                 if (ctx.cancelRemove(interceptorRes))
+                                     continue;
+                             }
+ 
+                             // Update previous batch.
+                             if (putMap != null) {
+                                 err = updatePartialBatch(
+                                     filtered,
+                                     ver,
+                                     putMap,
+                                     null,
+                                     expiryPlc,
+                                     err,
+                                     subjId,
+                                     taskName);
+ 
+                                 putMap = null;
+ 
+                                 filtered = new ArrayList<>();
+                             }
+ 
+                             // Start collecting new batch.
+                             if (rmvKeys == null)
+                                 rmvKeys = new ArrayList<>(size);
+ 
+                             rmvKeys.add(entry.key());
+                         }
+                         else {
+                             if (intercept) {
+                                 updated = (V)ctx.config().getInterceptor().onBeforePut(entry.key(), old, updated);
+ 
+                                 if (updated == null)
+                                     continue;
+                             }
+ 
+                             // Update previous batch.
+                             if (rmvKeys != null) {
+                                 err = updatePartialBatch(
+                                     filtered,
+                                     ver,
+                                     null,
+                                     rmvKeys,
+                                     expiryPlc,
+                                     err,
+                                     subjId,
+                                     taskName);
+ 
+                                 rmvKeys = null;
+ 
+                                 filtered = new ArrayList<>();
+                             }
+ 
+                             if (putMap == null)
+                                 putMap = new LinkedHashMap<>(size, 1.0f);
+ 
+                             putMap.put(entry.key(), ctx.<V>unwrapTemporary(updated));
+                         }
+                     }
+                     else if (op == UPDATE) {
+                         if (intercept) {
+                             V old = entry.innerGet(null,
+                                 /*swap*/true,
+                                 /*read-through*/ctx.loadPreviousValue(),
+                                 /*fail-fast*/false,
+                                 /*unmarshal*/true,
+                                 /**update-metrics*/true,
+                                 /**event*/true,
+                                 /**temporary*/true,
+                                 subjId,
+                                 null,
+                                 taskName,
+                                 CU.<K, V>empty(),
+                                 null);
+ 
+                             val = ctx.config().getInterceptor().onBeforePut(entry.key(), old, val);
+ 
+                             if (val == null)
+                                 continue;
+ 
+                             val = ctx.unwrapTemporary(val);
+                         }
+ 
+                         if (putMap == null)
+                             putMap = new LinkedHashMap<>(size, 1.0f);
+ 
+                         putMap.put(entry.key(), (V)val);
+                     }
+                     else {
+                         assert op == DELETE;
+ 
+                         if (intercept) {
+                             V old = entry.innerGet(null,
+                                 /*swap*/true,
+                                 /*read-through*/ctx.loadPreviousValue(),
+                                 /*fail-fast*/false,
+                                 /*unmarshal*/true,
+                                 /**update-metrics*/true,
+                                 /**event*/true,
+                                 /**temporary*/true,
+                                 subjId,
+                                 null,
+                                 taskName,
+                                 CU.<K, V>empty(),
+                                 null);
+ 
+                             IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(
+                                 entry.key(), old);
+ 
+                             if (ctx.cancelRemove(interceptorRes))
+                                 continue;
+                         }
+ 
+                         if (rmvKeys == null)
+                             rmvKeys = new ArrayList<>(size);
+ 
+                         rmvKeys.add(entry.key());
+                     }
+ 
+                     filtered.add(entry);
+                 }
+                 catch (IgniteCheckedException e) {
+                     if (err == null)
+                         err = partialUpdateException();
+ 
+                     err.add(F.asList(entry.key()), e);
+                 }
+                 catch (GridCacheEntryRemovedException ignore) {
+                     assert false : "Entry cannot become obsolete while holding lock.";
+                 }
+                 catch (GridCacheFilterFailedException ignore) {
+                     assert false : "Filter should never fail with failFast=false and empty filter.";
+                 }
+             }
+ 
+             // Store final batch.
+             if (putMap != null || rmvKeys != null) {
+                 err = updatePartialBatch(
+                     filtered,
+                     ver,
+                     putMap,
+                     rmvKeys,
+                     expiryPlc,
+                     err,
+                     subjId,
+                     taskName);
+             }
+             else
+                 assert filtered.isEmpty();
+ 
+             if (err != null)
+                 throw err;
+ 
+             return invokeResMap;
+         }
+         finally {
+             unlockEntries(locked);
+         }
+     }
+ 
+     /**
+      * @param entries Entries to update.
+      * @param ver Cache version.
+      * @param putMap Values to put.
+      * @param rmvKeys Keys to remove.
+      * @param expiryPlc Expiry policy.
+      * @param err Optional partial update exception.
+      * @param subjId Subject ID.
+      * @param taskName Task name.
+      * @return Partial update exception.
+      */
+     @SuppressWarnings({"unchecked", "ConstantConditions", "ForLoopReplaceableByForEach"})
+     @Nullable private GridCachePartialUpdateException updatePartialBatch(
+         List<GridCacheEntryEx<K, V>> entries,
+         final GridCacheVersion ver,
+         @Nullable Map<K, V> putMap,
+         @Nullable Collection<K> rmvKeys,
+         @Nullable ExpiryPolicy expiryPlc,
+         @Nullable GridCachePartialUpdateException err,
+         UUID subjId,
+         String taskName
+     ) {
+         assert putMap == null ^ rmvKeys == null;
+         GridCacheOperation op;
+ 
+         CacheStorePartialUpdateException storeErr = null;
+ 
+         try {
+             if (putMap != null) {
+                 try {
+                     ctx.store().putAllToStore(null, F.viewReadOnly(putMap, new C1<V, IgniteBiTuple<V, GridCacheVersion>>() {
+                         @Override public IgniteBiTuple<V, GridCacheVersion> apply(V v) {
+                             return F.t(v, ver);
+                         }
+                     }));
+                 }
+                 catch (CacheStorePartialUpdateException e) {
+                     storeErr = e;
+                 }
+ 
+                 op = UPDATE;
+             }
+             else {
+                 try {
+                     ctx.store().removeAllFromStore(null, rmvKeys);
+                 }
+                 catch (CacheStorePartialUpdateException e) {
+                     storeErr = e;
+                 }
+ 
+                 op = DELETE;
+             }
+         }
+         catch (IgniteCheckedException e) {
+             if (err == null)
+                 err = partialUpdateException();
+ 
+             err.add(putMap != null ? putMap.keySet() : rmvKeys, e);
+ 
+             return err;
+         }
+ 
+         boolean intercept = ctx.config().getInterceptor() != null;
+ 
+         for (int i = 0; i < entries.size(); i++) {
+             GridCacheEntryEx<K, V> entry = entries.get(i);
+ 
+             assert Thread.holdsLock(entry);
+ 
+             if (entry.obsolete() || (storeErr != null && storeErr.failedKeys().contains(entry.key())))
+                 continue;
+ 
+             try {
+                 // We are holding java-level locks on entries at this point.
+                 V writeVal = op == UPDATE ? putMap.get(entry.key()) : null;
+ 
+                 assert writeVal != null || op == DELETE : "null write value found.";
+ 
+                 GridTuple3<Boolean, V, EntryProcessorResult<Object>> t = entry.innerUpdateLocal(
+                     ver,
+                     op,
+                     writeVal,
+                     null,
+                     false,
+                     false,
+                     expiryPlc,
+                     true,
+                     true,
+                     null,
+                     false,
+                     subjId,
+                     taskName);
+ 
+                 if (intercept) {
+                     if (op == UPDATE)
+                         ctx.config().getInterceptor().onAfterPut(entry.key(), writeVal);
+                     else
+                         ctx.config().getInterceptor().onAfterRemove(entry.key(), t.get2());
+                 }
+             }
+             catch (GridCacheEntryRemovedException ignore) {
+                 assert false : "Entry cannot become obsolete while holding lock.";
+             }
+             catch (IgniteCheckedException e) {
+                 if (err == null)
+                     err = partialUpdateException();
+ 
+                 err.add(Collections.singleton(entry.key()), e);
+             }
+         }
+ 
+         return err;
+     }
+ 
+     /**
+      * Acquires java-level locks on cache entries.
+      *
+      * @param keys Keys to lock.
+      * @return Collection of locked entries.
+      */
+     private List<GridCacheEntryEx<K, V>> lockEntries(Collection<? extends K> keys) {
+         List<GridCacheEntryEx<K, V>> locked = new ArrayList<>(keys.size());
+ 
+         while (true) {
+             for (K key : keys) {
+                 if (key == null)
+                     continue;
+ 
+                 GridCacheEntryEx<K, V> entry = entryEx(key);
+ 
+                 locked.add(entry);
+             }
+ 
+             for (int i = 0; i < locked.size(); i++) {
+                 GridCacheEntryEx<K, V> entry = locked.get(i);
+ 
+                 UNSAFE.monitorEnter(entry);
+ 
+                 if (entry.obsolete()) {
+                     // Unlock all locked.
+                     for (int j = 0; j <= i; j++)
+                         UNSAFE.monitorExit(locked.get(j));
+ 
+                     // Clear entries.
+                     locked.clear();
+ 
+                     // Retry.
+                     break;
+                 }
+             }
+ 
+             if (!locked.isEmpty())
+                 return locked;
+         }
+     }
+ 
+     /**
+      * Releases java-level locks on cache entries.
+      *
+      * @param locked Locked entries.
+      */
+     private void unlockEntries(Iterable<GridCacheEntryEx<K, V>> locked) {
+         for (GridCacheEntryEx<K, V> entry : locked)
+             UNSAFE.monitorExit(entry);
+ 
+         long topVer = ctx.affinity().affinityTopologyVersion();
+ 
+         for (GridCacheEntryEx<K, V> entry : locked)
+             ctx.evicts().touch(entry, topVer);
+     }
+ 
+     /**
+      * @return New partial update exception.
+      */
+     private static GridCachePartialUpdateException partialUpdateException() {
+         return new GridCachePartialUpdateException("Failed to update keys (retry update if possible).");
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteFuture<Boolean> txLockAsync(Collection<? extends K> keys,
+         long timeout,
+         IgniteTxLocalEx<K, V> tx,
+         boolean isRead,
+         boolean retval,
+         IgniteTxIsolation isolation,
+         boolean invalidate,
+         long accessTtl,
+         IgnitePredicate<CacheEntry<K, V>>[] filter) {
+         return new GridFinishedFutureEx<>(new UnsupportedOperationException("Locks are not supported for " +
+             "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)"));
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public IgniteFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys,
+         long timeout,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) {
+         return new GridFinishedFutureEx<>(new UnsupportedOperationException("Locks are not supported for " +
+             "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)"));
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public void unlockAll(@Nullable Collection<? extends K> keys,
+         @Nullable IgnitePredicate<CacheEntry<K, V>>... filter) throws IgniteCheckedException {
+         throw new UnsupportedOperationException("Locks are not supported for " +
+             "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)");
+     }
+ 
+     /**
+      * @return Expiry policy.
+      */
+     @Nullable private ExpiryPolicy expiryPerCall() {
+         GridCacheProjectionImpl<K, V> prj = ctx.projectionPerCall();
+ 
+         ExpiryPolicy expiry = prj != null ? prj.expiry() : null;
+ 
+         if (expiry == null)
+             expiry = ctx.expiry();
+ 
+         return expiry;
+     }
+ 
+     /**
+      * @param op Operation closure.
+      * @return Future.
+      */
+     @SuppressWarnings("unchecked")
+     protected IgniteFuture asyncOp(final Callable<?> op) {
+         IgniteFuture fail = asyncOpAcquire();
+ 
+         if (fail != null)
+             return fail;
+ 
+         FutureHolder holder = lastFut.get();
+ 
+         holder.lock();
+ 
+         try {
+             IgniteFuture fut = holder.future();
+ 
+             if (fut != null && !fut.isDone()) {
+                 IgniteFuture f = new GridEmbeddedFuture(fut,
+                     new C2<Object, Exception, IgniteFuture>() {
+                         @Override public IgniteFuture apply(Object t, Exception e) {
+                             return ctx.closures().callLocalSafe(op);
+                         }
+                     }, ctx.kernalContext());
+ 
+                 saveFuture(holder, f);
+ 
+                 return f;
+             }
+ 
+             IgniteFuture f = ctx.closures().callLocalSafe(op);
+ 
+             saveFuture(holder, f);
+ 
+             return f;
+         }
+         finally {
+             holder.unlock();
+         }
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onDeferredDelete(GridCacheEntryEx<K, V> entry, GridCacheVersion ver) {
+         assert false : "Should not be called";
+     }
+ }


Mime
View raw message