ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [10/50] [abbrv] incubator-ignite git commit: # ignite-63
Date Thu, 22 Jan 2015 22:04:05 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtUnlockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
deleted file mode 100644
index 4230b3a..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtUnlockRequest.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.gridgain.grid.kernal.processors.cache.distributed.*;
-import org.apache.ignite.internal.util.direct.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-
-import java.io.*;
-import java.nio.*;
-import java.util.*;
-
-/**
- * DHT cache unlock request.
- */
-public class GridDhtUnlockRequest<K, V> extends GridDistributedUnlockRequest<K, V> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Near keys. */
-    @GridDirectCollection(byte[].class)
-    private List<byte[]> nearKeyBytes;
-
-    /** */
-    @GridDirectTransient
-    private List<K> nearKeys;
-
-    /**
-     * Empty constructor required by {@link Externalizable}.
-     */
-    public GridDhtUnlockRequest() {
-        // No-op.
-    }
-
-    /**
-     * @param cacheId Cache ID.
-     * @param dhtCnt Key count.
-     */
-    public GridDhtUnlockRequest(int cacheId, int dhtCnt) {
-        super(cacheId, dhtCnt);
-    }
-
-    /**
-     * @return Near keys.
-     */
-    public List<byte[]> nearKeyBytes() {
-        return nearKeyBytes != null ? nearKeyBytes : Collections.<byte[]>emptyList();
-    }
-
-    /**
-     * @return Near keys.
-     */
-    public List<K> nearKeys() {
-        return nearKeys;
-    }
-
-    /**
-     * Adds a Near key.
-     *
-     * @param key Key.
-     * @param keyBytes Key bytes.
-     * @param ctx Context.
-     * @throws IgniteCheckedException If failed.
-     */
-    public void addNearKey(K key, byte[] keyBytes, GridCacheSharedContext<K, V> ctx) throws IgniteCheckedException {
-        if (ctx.deploymentEnabled())
-            prepareObject(key, ctx);
-
-        if (nearKeyBytes == null)
-            nearKeyBytes = new ArrayList<>();
-
-        nearKeyBytes.add(keyBytes);
-    }
-
-    /** {@inheritDoc} */
-    @Override public void finishUnmarshal(GridCacheSharedContext<K, V> ctx, ClassLoader ldr) throws IgniteCheckedException {
-        super.finishUnmarshal(ctx, ldr);
-
-        if (nearKeyBytes != null)
-            nearKeys = unmarshalCollection(nearKeyBytes, ctx, ldr);
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridDhtUnlockRequest.class, this);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"})
-    @Override public GridTcpCommunicationMessageAdapter clone() {
-        GridDhtUnlockRequest _clone = new GridDhtUnlockRequest();
-
-        clone0(_clone);
-
-        return _clone;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) {
-        super.clone0(_msg);
-
-        GridDhtUnlockRequest _clone = (GridDhtUnlockRequest)_msg;
-
-        _clone.nearKeyBytes = nearKeyBytes;
-        _clone.nearKeys = nearKeys;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean writeTo(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.writeTo(buf))
-            return false;
-
-        if (!commState.typeWritten) {
-            if (!commState.putByte(directType()))
-                return false;
-
-            commState.typeWritten = true;
-        }
-
-        switch (commState.idx) {
-            case 9:
-                if (nearKeyBytes != null) {
-                    if (commState.it == null) {
-                        if (!commState.putInt(nearKeyBytes.size()))
-                            return false;
-
-                        commState.it = nearKeyBytes.iterator();
-                    }
-
-                    while (commState.it.hasNext() || commState.cur != NULL) {
-                        if (commState.cur == NULL)
-                            commState.cur = commState.it.next();
-
-                        if (!commState.putByteArray((byte[])commState.cur))
-                            return false;
-
-                        commState.cur = NULL;
-                    }
-
-                    commState.it = null;
-                } else {
-                    if (!commState.putInt(-1))
-                        return false;
-                }
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("all")
-    @Override public boolean readFrom(ByteBuffer buf) {
-        commState.setBuffer(buf);
-
-        if (!super.readFrom(buf))
-            return false;
-
-        switch (commState.idx) {
-            case 9:
-                if (commState.readSize == -1) {
-                    if (buf.remaining() < 4)
-                        return false;
-
-                    commState.readSize = commState.getInt();
-                }
-
-                if (commState.readSize >= 0) {
-                    if (nearKeyBytes == null)
-                        nearKeyBytes = new ArrayList<>(commState.readSize);
-
-                    for (int i = commState.readItems; i < commState.readSize; i++) {
-                        byte[] _val = commState.getByteArray();
-
-                        if (_val == BYTE_ARR_NOT_READ)
-                            return false;
-
-                        nearKeyBytes.add((byte[])_val);
-
-                        commState.readItems++;
-                    }
-                }
-
-                commState.readSize = -1;
-                commState.readItems = 0;
-
-                commState.idx++;
-
-        }
-
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public byte directType() {
-        return 35;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
deleted file mode 100644
index 125f0e9..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridNoStorageCacheMap.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Empty cache map that will never store any entries.
- */
-public class GridNoStorageCacheMap<K, V> extends GridCacheConcurrentMap<K, V> {
-    /** Empty triple. */
-    private final GridTriple<GridCacheMapEntry<K,V>> emptyTriple =
-        new GridTriple<>(null, null, null);
-
-    /**
-     * @param ctx Cache context.
-     */
-    public GridNoStorageCacheMap(GridCacheContext<K, V> ctx) {
-        super(ctx, 0, 0.75f, 1);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isEmpty() {
-        return true;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int size() {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public int publicSize() {
-        return 0;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean containsKey(Object key) {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheMapEntry<K, V> randomEntry() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheMapEntry<K, V> getEntry(Object key) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheMapEntry<K, V> putEntry(long topVer, K key, @Nullable V val, long ttl) {
-        throw new AssertionError();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridTriple<GridCacheMapEntry<K, V>> putEntryIfObsoleteOrAbsent(long topVer, K key, @Nullable V val,
-        long ttl, boolean create) {
-        if (create) {
-            GridCacheMapEntry<K, V> entry = new GridDhtCacheEntry<>(ctx, topVer, key, hash(key.hashCode()), val,
-                null, 0, 0);
-
-            return new GridTriple<>(entry, null, null);
-        }
-        else
-            return emptyTriple;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void putAll(Map<? extends K, ? extends V> m, long ttl) {
-        throw new AssertionError();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean removeEntry(GridCacheEntryEx<K, V> e) {
-        throw new AssertionError();
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheMapEntry<K, V> removeEntryIfObsolete(K key) {
-        throw new AssertionError();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridNoStorageCacheMap.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
deleted file mode 100644
index a4088ec..0000000
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ /dev/null
@@ -1,715 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.gridgain.grid.kernal.processors.cache.distributed.dht;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.lang.*;
-import org.gridgain.grid.kernal.processors.cache.*;
-import org.apache.ignite.internal.processors.cache.distributed.near.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.IgniteSystemProperties.*;
-
-/**
- * Colocated get future.
- */
-public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>>
-    implements GridCacheFuture<Map<K, V>> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** Default max remap count value. */
-    public static final int DFLT_MAX_REMAP_CNT = 3;
-
-    /** Logger reference. */
-    private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
-
-    /** Maximum number of attempts to remap key to the same primary node. */
-    private static final int MAX_REMAP_CNT = IgniteSystemProperties.getInteger(GG_NEAR_GET_MAX_REMAPS,
-        DFLT_MAX_REMAP_CNT);
-
-    /** Context. */
-    private GridCacheContext<K, V> cctx;
-
-    /** Keys. */
-    private Collection<? extends K> keys;
-
-    /** Topology version. */
-    private long topVer;
-
-    /** Reload flag. */
-    private boolean reload;
-
-    /** Read-through flag. */
-    private boolean readThrough;
-
-    /** Force primary flag. */
-    private boolean forcePrimary;
-
-    /** Future ID. */
-    private IgniteUuid futId;
-
-    /** Version. */
-    private GridCacheVersion ver;
-
-    /** Filters. */
-    private IgnitePredicate<GridCacheEntry<K, V>>[] filters;
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Trackable flag. */
-    private volatile boolean trackable;
-
-    /** Remap count. */
-    private AtomicInteger remapCnt = new AtomicInteger();
-
-    /** Subject ID. */
-    private UUID subjId;
-
-    /** Task name. */
-    private String taskName;
-
-    /** Whether to deserialize portable objects. */
-    private boolean deserializePortable;
-
-    /** Expiry policy. */
-    private IgniteCacheExpiryPolicy expiryPlc;
-
-    /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridPartitionedGetFuture() {
-        // No-op.
-    }
-
-    /**
-     * @param cctx Context.
-     * @param keys Keys.
-     * @param topVer Topology version.
-     * @param readThrough Read through flag.
-     * @param reload Reload flag.
-     * @param forcePrimary If {@code true} then will force network trip to primary node even
-     *          if called on backup node.
-     * @param filters Filters.
-     * @param subjId Subject ID.
-     * @param taskName Task name.
-     * @param deserializePortable Deserialize portable flag.
-     * @param expiryPlc Expiry policy.
-     */
-    public GridPartitionedGetFuture(
-        GridCacheContext<K, V> cctx,
-        Collection<? extends K> keys,
-        long topVer,
-        boolean readThrough,
-        boolean reload,
-        boolean forcePrimary,
-        @Nullable IgnitePredicate<GridCacheEntry<K, V>>[] filters,
-        @Nullable UUID subjId,
-        String taskName,
-        boolean deserializePortable,
-        @Nullable IgniteCacheExpiryPolicy expiryPlc
-    ) {
-        super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
-
-        assert !F.isEmpty(keys);
-
-        this.cctx = cctx;
-        this.keys = keys;
-        this.topVer = topVer;
-        this.readThrough = readThrough;
-        this.reload = reload;
-        this.forcePrimary = forcePrimary;
-        this.filters = filters;
-        this.subjId = subjId;
-        this.deserializePortable = deserializePortable;
-        this.taskName = taskName;
-        this.expiryPlc = expiryPlc;
-
-        futId = IgniteUuid.randomUuid();
-
-        ver = cctx.versions().next();
-
-        log = U.logger(ctx, logRef, GridPartitionedGetFuture.class);
-    }
-
-    /**
-     * Initializes future.
-     */
-    public void init() {
-        long topVer = this.topVer > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
-
-        map(keys, Collections.<ClusterNode, LinkedHashMap<K, Boolean>>emptyMap(), topVer);
-
-        markInitialized();
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean trackable() {
-        return trackable;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void markNotTrackable() {
-        // Should not flip trackable flag from true to false since get future can be remapped.
-    }
-
-    /**
-     * @return Keys.
-     */
-    Collection<? extends K> keys() {
-        return keys;
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteUuid futureId() {
-        return futId;
-    }
-
-    /** {@inheritDoc} */
-    @Override public GridCacheVersion version() {
-        return ver;
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public Collection<? extends ClusterNode> nodes() {
-        return
-            F.viewReadOnly(futures(), new IgniteClosure<IgniteFuture<Map<K, V>>, ClusterNode>() {
-                @Nullable @Override public ClusterNode apply(IgniteFuture<Map<K, V>> f) {
-                    if (isMini(f))
-                        return ((MiniFuture)f).node();
-
-                    return cctx.discovery().localNode();
-                }
-            });
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onNodeLeft(UUID nodeId) {
-        for (IgniteFuture<Map<K, V>> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
-
-                if (f.node().id().equals(nodeId)) {
-                    f.onResult(new ClusterTopologyException("Remote node left grid (will retry): " + nodeId));
-
-                    return true;
-                }
-            }
-
-        return false;
-    }
-
-    /**
-     * @param nodeId Sender.
-     * @param res Result.
-     */
-    public void onResult(UUID nodeId, GridNearGetResponse<K, V> res) {
-        for (IgniteFuture<Map<K, V>> fut : futures())
-            if (isMini(fut)) {
-                MiniFuture f = (MiniFuture)fut;
-
-                if (f.futureId().equals(res.miniId())) {
-                    assert f.node().id().equals(nodeId);
-
-                    f.onResult(res);
-                }
-            }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean onDone(Map<K, V> res, Throwable err) {
-        if (super.onDone(res, err)) {
-            // Don't forget to clean up.
-            if (trackable)
-                cctx.mvcc().removeFuture(this);
-
-            cache().sendTtlUpdateRequest(expiryPlc);
-
-            return true;
-        }
-
-        return false;
-    }
-
-    /**
-     * @param f Future.
-     * @return {@code True} if mini-future.
-     */
-    private boolean isMini(IgniteFuture<Map<K, V>> f) {
-        return f.getClass().equals(MiniFuture.class);
-    }
-
-    /**
-     * @param keys Keys.
-     * @param mapped Mappings to check for duplicates.
-     * @param topVer Topology version on which keys should be mapped.
-     */
-    private void map(Collection<? extends K> keys, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped, long topVer) {
-        if (CU.affinityNodes(cctx, topVer).isEmpty()) {
-            onDone(new ClusterTopologyException("Failed to map keys for cache (all partition nodes left the grid)."));
-
-            return;
-        }
-
-        Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings = U.newHashMap(CU.affinityNodes(cctx, topVer).size());
-
-        final int keysSize = keys.size();
-
-        Map<K, V> locVals = U.newHashMap(keysSize);
-
-        boolean hasRmtNodes = false;
-
-        // Assign keys to primary nodes.
-        for (K key : keys) {
-            if (key != null)
-                hasRmtNodes |= map(key, mappings, locVals, topVer, mapped);
-        }
-
-        if (isDone())
-            return;
-
-        if (!locVals.isEmpty())
-            add(new GridFinishedFuture<>(cctx.kernalContext(), locVals));
-
-        if (hasRmtNodes) {
-            trackable = true;
-
-            cctx.mvcc().addFuture(this);
-        }
-
-        // Create mini futures.
-        for (Map.Entry<ClusterNode, LinkedHashMap<K, Boolean>> entry : mappings.entrySet()) {
-            final ClusterNode n = entry.getKey();
-
-            final LinkedHashMap<K, Boolean> mappedKeys = entry.getValue();
-
-            assert !mappedKeys.isEmpty();
-
-            // If this is the primary or backup node for the keys.
-            if (n.isLocal()) {
-                final GridDhtFuture<Collection<GridCacheEntryInfo<K, V>>> fut =
-                    cache().getDhtAsync(n.id(),
-                        -1,
-                        mappedKeys,
-                        readThrough,
-                        reload,
-                        topVer,
-                        subjId,
-                        taskName == null ? 0 : taskName.hashCode(),
-                        deserializePortable,
-                        filters,
-                        expiryPlc);
-
-                final Collection<Integer> invalidParts = fut.invalidPartitions();
-
-                if (!F.isEmpty(invalidParts)) {
-                    Collection<K> remapKeys = new ArrayList<>(keysSize);
-
-                    for (K key : keys) {
-                        if (key != null && invalidParts.contains(cctx.affinity().partition(key)))
-                            remapKeys.add(key);
-                    }
-
-                    long updTopVer = ctx.discovery().topologyVersion();
-
-                    assert updTopVer > topVer : "Got invalid partitions for local node but topology version did " +
-                        "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
-                        ", invalidParts=" + invalidParts + ']';
-
-                    // Remap recursively.
-                    map(remapKeys, mappings, updTopVer);
-                }
-
-                // Add new future.
-                add(fut.chain(new C1<IgniteFuture<Collection<GridCacheEntryInfo<K, V>>>, Map<K, V>>() {
-                    @Override public Map<K, V> apply(IgniteFuture<Collection<GridCacheEntryInfo<K, V>>> fut) {
-                        try {
-                            return createResultMap(fut.get());
-                        }
-                        catch (Exception e) {
-                            U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e);
-
-                            onDone(e);
-
-                            return Collections.emptyMap();
-                        }
-                    }
-                }));
-            }
-            else {
-                MiniFuture fut = new MiniFuture(n, mappedKeys, topVer);
-
-                GridCacheMessage<K, V> req = new GridNearGetRequest<>(
-                    cctx.cacheId(),
-                    futId,
-                    fut.futureId(),
-                    ver,
-                    mappedKeys,
-                    readThrough,
-                    reload,
-                    topVer,
-                    filters,
-                    subjId,
-                    taskName == null ? 0 : taskName.hashCode(),
-                    expiryPlc != null ? expiryPlc.forAccess() : -1L);
-
-                add(fut); // Append new future.
-
-                try {
-                    cctx.io().send(n, req);
-                }
-                catch (IgniteCheckedException e) {
-                    // Fail the whole thing.
-                    if (e instanceof ClusterTopologyException)
-                        fut.onResult((ClusterTopologyException)e);
-                    else
-                        fut.onResult(e);
-                }
-            }
-        }
-    }
-
-    /**
-     * @param mappings Mappings.
-     * @param key Key to map.
-     * @param locVals Local values.
-     * @param topVer Topology version.
-     * @param mapped Previously mapped.
-     * @return {@code True} if has remote nodes.
-     */
-    @SuppressWarnings("ConstantConditions")
-    private boolean map(K key, Map<ClusterNode, LinkedHashMap<K, Boolean>> mappings, Map<K, V> locVals,
-        long topVer, Map<ClusterNode, LinkedHashMap<K, Boolean>> mapped) {
-        GridDhtCacheAdapter<K, V> colocated = cache();
-
-        boolean remote = false;
-
-        // Allow to get cached value from the local node.
-        boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer);
-
-        while (true) {
-            GridCacheEntryEx<K, V> entry = null;
-
-            try {
-                if (!reload && allowLocRead) {
-                    try {
-                        entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) :
-                            colocated.peekEx(key);
-
-                        // If our DHT cache do has value, then we peek it.
-                        if (entry != null) {
-                            boolean isNew = entry.isNewLocked();
-
-                            V v = entry.innerGet(null,
-                                /*swap*/true,
-                                /*read-through*/false,
-                                /*fail-fast*/true,
-                                /*unmarshal*/true,
-                                /**update-metrics*/true,
-                                /*event*/true,
-                                /*temporary*/false,
-                                subjId,
-                                null,
-                                taskName,
-                                filters,
-                                expiryPlc);
-
-                            colocated.context().evicts().touch(entry, topVer);
-
-                            // Entry was not in memory or in swap, so we remove it from cache.
-                            if (v == null) {
-                                if (isNew && entry.markObsoleteIfEmpty(ver))
-                                    colocated.removeIfObsolete(key);
-                            }
-                            else {
-                                if (cctx.portableEnabled())
-                                    v = (V)cctx.unwrapPortableIfNeeded(v, !deserializePortable);
-
-                                locVals.put(key, v);
-
-                                return false;
-                            }
-                        }
-                    }
-                    catch (GridDhtInvalidPartitionException ignored) {
-                        // No-op.
-                    }
-                }
-
-                ClusterNode node = cctx.affinity().primary(key, topVer);
-
-                remote = !node.isLocal();
-
-                LinkedHashMap<K, Boolean> keys = mapped.get(node);
-
-                if (keys != null && keys.containsKey(key)) {
-                    if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) {
-                        onDone(new ClusterTopologyException("Failed to remap key to a new node after " + MAX_REMAP_CNT
-                            + " attempts (key got remapped to the same node) [key=" + key + ", node=" +
-                            U.toShortString(node) + ", mappings=" + mapped + ']'));
-
-                        return false;
-                    }
-                }
-
-                LinkedHashMap<K, Boolean> old = mappings.get(node);
-
-                if (old == null)
-                    mappings.put(node, old = new LinkedHashMap<>(3, 1f));
-
-                old.put(key, false);
-
-                break;
-            }
-            catch (IgniteCheckedException e) {
-                onDone(e);
-
-                break;
-            }
-            catch (GridCacheEntryRemovedException ignored) {
-                // No-op, will retry.
-            }
-            catch (GridCacheFilterFailedException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Filter validation failed for entry: " + e);
-
-                colocated.context().evicts().touch(entry, topVer);
-
-                break;
-            }
-        }
-
-        return remote;
-    }
-
-    /**
-     * @return Near cache.
-     */
-    private GridDhtCacheAdapter<K, V> cache() {
-        return cctx.dht();
-    }
-
-    /**
-     * @param infos Entry infos.
-     * @return Result map.
-     */
-    private Map<K, V> createResultMap(Collection<GridCacheEntryInfo<K, V>> infos) {
-        int keysSize = infos.size();
-
-        try {
-            if (keysSize != 0) {
-                Map<K, V> map = new GridLeanMap<>(keysSize);
-
-                for (GridCacheEntryInfo<K, V> info : infos) {
-                    info.unmarshalValue(cctx, cctx.deploy().globalLoader());
-
-                    K key = info.key();
-                    V val = info.value();
-
-                    if (cctx.portableEnabled()) {
-                        key = (K)cctx.unwrapPortableIfNeeded(key, !deserializePortable);
-                        val = (V)cctx.unwrapPortableIfNeeded(val, !deserializePortable);
-                    }
-
-                    map.put(key, val);
-                }
-
-                return map;
-            }
-        }
-        catch (IgniteCheckedException e) {
-            // Fail.
-            onDone(e);
-        }
-
-        return Collections.emptyMap();
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(GridPartitionedGetFuture.class, this, super.toString());
-    }
-
-    /**
-     * Mini-future for get operations. Mini-futures are only waiting on a single
-     * node as opposed to multiple nodes.
-     */
-    private class MiniFuture extends GridFutureAdapter<Map<K, V>> {
-        /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
-        private final IgniteUuid futId = IgniteUuid.randomUuid();
-
-        /** Node ID. */
-        private ClusterNode node;
-
-        /** Keys. */
-        @GridToStringInclude
-        private LinkedHashMap<K, Boolean> keys;
-
-        /** Topology version on which this future was mapped. */
-        private long topVer;
-
-        /**
-         * Empty constructor required for {@link Externalizable}.
-         */
-        public MiniFuture() {
-            // No-op.
-        }
-
-        /**
-         * @param node Node.
-         * @param keys Keys.
-         * @param topVer Topology version.
-         */
-        MiniFuture(ClusterNode node, LinkedHashMap<K, Boolean> keys, long topVer) {
-            super(cctx.kernalContext());
-
-            this.node = node;
-            this.keys = keys;
-            this.topVer = topVer;
-        }
-
-        /**
-         * @return Future ID.
-         */
-        IgniteUuid futureId() {
-            return futId;
-        }
-
-        /**
-         * @return Node ID.
-         */
-        public ClusterNode node() {
-            return node;
-        }
-
-        /**
-         * @return Keys.
-         */
-        public Collection<K> keys() {
-            return keys.keySet();
-        }
-
-        /**
-         * @param e Error.
-         */
-        void onResult(Throwable e) {
-            if (log.isDebugEnabled())
-                log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
-
-            // Fail.
-            onDone(e);
-        }
-
-        /**
-         * @param e Failure exception.
-         */
-        @SuppressWarnings("UnusedParameters")
-        void onResult(ClusterTopologyException e) {
-            if (log.isDebugEnabled())
-                log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
-
-            long updTopVer = ctx.discovery().topologyVersion();
-
-            assert updTopVer > topVer : "Got topology exception but topology version did " +
-                "not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
-                ", nodeId=" + node.id() + ']';
-
-            // Remap.
-            map(keys.keySet(), F.t(node, keys), updTopVer);
-
-            onDone(Collections.<K, V>emptyMap());
-        }
-
-        /**
-         * @param res Result callback.
-         */
-        @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
-        void onResult(final GridNearGetResponse<K, V> res) {
-            final Collection<Integer> invalidParts = res.invalidPartitions();
-
-            // If error happened on remote node, fail the whole future.
-            if (res.error() != null) {
-                onDone(res.error());
-
-                return;
-            }
-
-            // Remap invalid partitions.
-            if (!F.isEmpty(invalidParts)) {
-                long rmtTopVer = res.topologyVersion();
-
-                assert rmtTopVer != 0;
-
-                if (rmtTopVer <= topVer) {
-                    // Fail the whole get future.
-                    onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " +
-                        "invalid partitions but remote topology version does not differ from local) " +
-                        "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", invalidParts=" + invalidParts +
-                        ", nodeId=" + node.id() + ']'));
-
-                    return;
-                }
-
-                if (log.isDebugEnabled())
-                    log.debug("Remapping mini get future [invalidParts=" + invalidParts + ", fut=" + this + ']');
-
-                // Need to wait for next topology version to remap.
-                IgniteFuture<Long> topFut = ctx.discovery().topologyFuture(rmtTopVer);
-
-                topFut.listenAsync(new CIX1<IgniteFuture<Long>>() {
-                    @SuppressWarnings("unchecked")
-                    @Override public void applyx(IgniteFuture<Long> fut) throws IgniteCheckedException {
-                        long topVer = fut.get();
-
-                        // This will append new futures to compound list.
-                        map(F.view(keys.keySet(),  new P1<K>() {
-                            @Override public boolean apply(K key) {
-                                return invalidParts.contains(cctx.affinity().partition(key));
-                            }
-                        }), F.t(node, keys), topVer);
-
-                        onDone(createResultMap(res.entries()));
-                    }
-                });
-            }
-            else
-                onDone(createResultMap(res.entries()));
-        }
-
-        /** {@inheritDoc} */
-        @Override public String toString() {
-            return S.toString(MiniFuture.class, this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractPartitionedOnlyByteArrayValuesSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractPartitionedOnlyByteArrayValuesSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractPartitionedOnlyByteArrayValuesSelfTest.java
new file mode 100644
index 0000000..23e34dc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractPartitionedOnlyByteArrayValuesSelfTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+import org.apache.ignite.spi.swapspace.file.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests for byte array values in PARTITIONED-ONLY caches.
+ */
+public abstract class GridCacheAbstractPartitionedOnlyByteArrayValuesSelfTest extends
+    GridCacheAbstractPartitionedByteArrayValuesSelfTest {
+    /** Offheap cache name. */
+    protected static final String CACHE_ATOMIC = "cache_atomic";
+
+    /** Offheap cache name. */
+    protected static final String CACHE_ATOMIC_OFFHEAP = "cache_atomic_offheap";
+
+    /** Offheap tiered cache name. */
+    protected static final String CACHE_ATOMIC_OFFHEAP_TIERED = "cache_atomic_offheap_tiered";
+
+    /** Atomic caches. */
+    private static GridCache<Integer, Object>[] cachesAtomic;
+
+    /** Atomic offheap caches. */
+    private static GridCache<Integer, Object>[] cachesAtomicOffheap;
+
+    /** Atomic offheap caches. */
+    private static GridCache<Integer, Object>[] cachesAtomicOffheapTiered;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        CacheConfiguration atomicCacheCfg = cacheConfiguration0();
+
+        atomicCacheCfg.setName(CACHE_ATOMIC);
+        atomicCacheCfg.setAtomicityMode(ATOMIC);
+        atomicCacheCfg.setAtomicWriteOrderMode(PRIMARY);
+
+        CacheConfiguration atomicOffheapCacheCfg = offheapCacheConfiguration0();
+
+        atomicOffheapCacheCfg.setName(CACHE_ATOMIC_OFFHEAP);
+        atomicOffheapCacheCfg.setAtomicityMode(ATOMIC);
+        atomicOffheapCacheCfg.setAtomicWriteOrderMode(PRIMARY);
+
+        CacheConfiguration atomicOffheapTieredCacheCfg = offheapTieredCacheConfiguration();
+
+        atomicOffheapTieredCacheCfg.setName(CACHE_ATOMIC_OFFHEAP_TIERED);
+        atomicOffheapTieredCacheCfg.setAtomicityMode(ATOMIC);
+        atomicOffheapTieredCacheCfg.setAtomicWriteOrderMode(PRIMARY);
+
+        c.setCacheConfiguration(cacheConfiguration(),
+            offheapCacheConfiguration(),
+            offheapTieredCacheConfiguration(),
+            atomicCacheCfg,
+            atomicOffheapCacheCfg,
+            atomicOffheapTieredCacheCfg);
+
+        c.setSwapSpaceSpi(new FileSwapSpaceSpi());
+
+        c.setPeerClassLoadingEnabled(peerClassLoading());
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        int gridCnt = gridCount();
+
+        cachesAtomic = new GridCache[gridCnt];
+        cachesAtomicOffheap = new GridCache[gridCnt];
+        cachesAtomicOffheapTiered = new GridCache[gridCnt];
+
+        for (int i = 0; i < gridCount(); i++) {
+            cachesAtomic[i] = ignites[i].cache(CACHE_ATOMIC);
+            cachesAtomicOffheap[i] = ignites[i].cache(CACHE_ATOMIC_OFFHEAP);
+            cachesAtomicOffheapTiered[i] = ignites[i].cache(CACHE_ATOMIC_OFFHEAP_TIERED);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        cachesAtomic = null;
+        cachesAtomicOffheap = null;
+        cachesAtomicOffheapTiered = null;
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * Test atomic cache.
+     *
+     * @throws Exception If failed.
+     */
+    public void testAtomic() throws Exception {
+        testAtomic0(cachesAtomic);
+    }
+
+    /**
+     * Test atomic offheap cache.
+     *
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheap() throws Exception {
+        testAtomic0(cachesAtomicOffheap);
+    }
+
+    /**
+     * Test atomic offheap cache.
+     *
+     * @throws Exception If failed.
+     */
+    public void testAtomicOffheapTiered() throws Exception {
+        testAtomic0(cachesAtomicOffheapTiered);
+    }
+
+    /**
+     * INternal routine for ATOMIC cache testing.
+     *
+     * @param caches Caches.
+     * @throws Exception If failed.
+     */
+    private void testAtomic0(GridCache<Integer, Object>[] caches) throws Exception {
+        byte[] val = wrap(1);
+
+        for (GridCache<Integer, Object> cache : caches) {
+            cache.put(KEY_1, val);
+
+            for (GridCache<Integer, Object> cacheInner : caches)
+                assertArrayEquals(val, (byte[])cacheInner.get(KEY_1));
+
+            cache.remove(KEY_1);
+
+            assertNull(cache.get(KEY_1));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
new file mode 100644
index 0000000..3066d94
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAbstractTransformWriteThroughSelfTest.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.marshaller.optimized.*;
+import org.apache.ignite.transactions.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.gridgain.testframework.junits.common.*;
+
+import javax.cache.configuration.*;
+import javax.cache.processor.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.transactions.IgniteTxConcurrency.*;
+import static org.apache.ignite.transactions.IgniteTxIsolation.*;
+
+/**
+ * Tests write-through.
+ */
+public abstract class GridCacheAbstractTransformWriteThroughSelfTest extends GridCommonAbstractTest {
+    /** Grid count. */
+    protected static final int GRID_CNT = 3;
+
+    /** Update operation. */
+    protected static final int OP_UPDATE = 0;
+
+    /** Delete operation. */
+    protected static final int OP_DELETE = 1;
+
+    /** Near node constant. */
+    protected static final int NEAR_NODE = 0;
+
+    /** Primary node constant. */
+    protected static final int PRIMARY_NODE = 0;
+
+    /** Backup node constant. */
+    protected static final int BACKUP_NODE = 0;
+
+    /** Keys number. */
+    public static final int KEYS_CNT = 30;
+
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Value increment processor. */
+    private static final EntryProcessor<String, Integer, Void> INCR_CLOS = new EntryProcessor<String, Integer, Void>() {
+        @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+            if (!e.exists())
+                e.setValue(1);
+            else
+                e.setValue(e.getValue() + 1);
+
+            return null;
+        }
+    };
+
+    /** Value remove processor. */
+    private static final EntryProcessor<String, Integer, Void> RMV_CLOS = new EntryProcessor<String, Integer, Void>() {
+        @Override public Void process(MutableEntry<String, Integer> e, Object... args) {
+            e.remove();
+
+            return null;
+        }
+    };
+
+    /** Test store. */
+    private static List<GridCacheGenericTestStore<String, Integer>> stores =
+        new ArrayList<>(GRID_CNT);
+
+    /**
+     * @return {@code True} if batch update is enabled.
+     */
+    protected abstract boolean batchUpdate();
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(new IgniteOptimizedMarshaller(false));
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        GridCacheGenericTestStore<String, Integer> store = new GridCacheGenericTestStore<>();
+
+        stores.add(store);
+
+        CacheConfiguration cacheCfg = defaultCacheConfiguration();
+
+        cacheCfg.setCacheMode(PARTITIONED);
+        cacheCfg.setBackups(1);
+        cacheCfg.setCacheStoreFactory(new FactoryBuilder.SingletonFactory(store));
+        cacheCfg.setReadThrough(true);
+        cacheCfg.setWriteThrough(true);
+        cacheCfg.setLoadPreviousValue(true);
+        cacheCfg.setAtomicityMode(TRANSACTIONAL);
+        cacheCfg.setDistributionMode(NEAR_PARTITIONED);
+
+        cfg.setCacheConfiguration(cacheCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        for (int i = 0; i < GRID_CNT; i++)
+            startGrid(i);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids(true);
+
+        stores.clear();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        for (GridCacheGenericTestStore<String, Integer> store : stores)
+            store.reset();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformOptimisticNearUpdate() throws Exception {
+        checkTransform(OPTIMISTIC, NEAR_NODE, OP_UPDATE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformOptimisticPrimaryUpdate() throws Exception {
+        checkTransform(OPTIMISTIC, PRIMARY_NODE, OP_UPDATE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformOptimisticBackupUpdate() throws Exception {
+        checkTransform(OPTIMISTIC, BACKUP_NODE, OP_UPDATE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformOptimisticNearDelete() throws Exception {
+        checkTransform(OPTIMISTIC, NEAR_NODE, OP_DELETE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformOptimisticPrimaryDelete() throws Exception {
+        checkTransform(OPTIMISTIC, PRIMARY_NODE, OP_DELETE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformOptimisticBackupDelete() throws Exception {
+        checkTransform(OPTIMISTIC, BACKUP_NODE, OP_DELETE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformPessimisticNearUpdate() throws Exception {
+        checkTransform(PESSIMISTIC, NEAR_NODE, OP_UPDATE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformPessimisticPrimaryUpdate() throws Exception {
+        checkTransform(PESSIMISTIC, PRIMARY_NODE, OP_UPDATE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformPessimisticBackupUpdate() throws Exception {
+        checkTransform(PESSIMISTIC, BACKUP_NODE, OP_UPDATE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformPessimisticNearDelete() throws Exception {
+        checkTransform(PESSIMISTIC, NEAR_NODE, OP_DELETE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformPessimisticPrimaryDelete() throws Exception {
+        checkTransform(PESSIMISTIC, PRIMARY_NODE, OP_DELETE);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTransformPessimisticBackupDelete() throws Exception {
+        checkTransform(PESSIMISTIC, BACKUP_NODE, OP_DELETE);
+    }
+
+    /**
+     * @param concurrency Concurrency.
+     * @param nodeType Node type.
+     * @param op Op.
+     * @throws Exception If failed.
+     */
+    protected void checkTransform(IgniteTxConcurrency concurrency, int nodeType, int op) throws Exception {
+        IgniteCache<String, Integer> cache = jcache(0);
+
+        Collection<String> keys = keysForType(nodeType);
+
+        for (String key : keys)
+            cache.put(key, 1);
+
+        GridCacheGenericTestStore<String, Integer> nearStore = stores.get(0);
+
+        nearStore.reset();
+
+        for (String key : keys)
+            cache(0).clear(key);
+
+        info(">>> Starting transform transaction");
+
+        try (IgniteTx tx = ignite(0).transactions().txStart(concurrency, READ_COMMITTED)) {
+            if (op == OP_UPDATE) {
+                for (String key : keys)
+                    cache.invoke(key, INCR_CLOS);
+            }
+            else {
+                for (String key : keys)
+                    cache.invoke(key, RMV_CLOS);
+            }
+
+            tx.commit();
+        }
+
+        if (batchUpdate()) {
+            assertEquals(0, nearStore.getPutCount());
+            assertEquals(0, nearStore.getRemoveCount());
+
+            if (op == OP_UPDATE)
+                assertEquals(1, nearStore.getPutAllCount());
+            else
+                assertEquals(1, nearStore.getRemoveAllCount());
+        }
+        else {
+            assertEquals(0, nearStore.getPutAllCount());
+            assertEquals(0, nearStore.getRemoveAllCount());
+
+            if (op == OP_UPDATE)
+                assertEquals(keys.size(), nearStore.getPutCount());
+            else
+                assertEquals(keys.size(), nearStore.getRemoveCount());
+        }
+
+        if (op == OP_UPDATE) {
+            for (String key : keys)
+                assertEquals((Integer)2, nearStore.getMap().get(key));
+        }
+        else {
+            for (String key : keys)
+                assertNull(nearStore.getMap().get(key));
+        }
+    }
+
+    /**
+     * @param nodeType Node type to generate keys for.
+     * @return Collection of keys.
+     */
+    private Collection<String> keysForType(int nodeType) {
+        Collection<String> keys = new ArrayList<>(KEYS_CNT);
+
+        int numKey = 0;
+
+        while (keys.size() < 30) {
+            String key = String.valueOf(numKey);
+
+            if (nodeType == NEAR_NODE) {
+                if (!cache(0).affinity().isPrimaryOrBackup(grid(0).localNode(), key))
+                    keys.add(key);
+            }
+            else if (nodeType == PRIMARY_NODE) {
+                if (cache(0).affinity().isPrimary(grid(0).localNode(), key))
+                    keys.add(key);
+            }
+            else if (nodeType == BACKUP_NODE) {
+                if (cache(0).affinity().isBackup(grid(0).localNode(), key))
+                    keys.add(key);
+            }
+
+            numKey++;
+        }
+
+        return keys;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicExpiredEntriesPreloadSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicExpiredEntriesPreloadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicExpiredEntriesPreloadSelfTest.java
new file mode 100644
index 0000000..5002961
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicExpiredEntriesPreloadSelfTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.gridgain.grid.kernal.processors.cache.distributed.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+
+/**
+ * Tests preloading of expired entries.
+ */
+public class GridCacheAtomicExpiredEntriesPreloadSelfTest extends GridCacheExpiredEntriesPreloadAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheMode cacheMode() {
+        return PARTITIONED;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicFullApiSelfTest.java
new file mode 100644
index 0000000..2b90f9c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicFullApiSelfTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.*;
+
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+
+/**
+ * Multi node test for disabled near cache.
+ */
+public class GridCacheAtomicFullApiSelfTest extends GridCachePartitionedFullApiSelfTest {
+    /**
+     * @return Write order mode for atomic cache.
+     */
+    protected GridCacheAtomicWriteOrderMode atomicWriteOrderMode() {
+        return CLOCK;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheAtomicityMode atomicityMode() {
+        return ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return PARTITIONED_ONLY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean txEnabled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean lockingEnabled() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(gridName);
+
+        ccfg.setAtomicWriteOrderMode(atomicWriteOrderMode());
+
+        return ccfg;
+    }
+
+    /**
+     * @throws Exception In case of error.
+     */
+    @Override public void testGetAll() throws Exception {
+        cache().put("key1", 1);
+        cache().put("key2", 2);
+
+        assert cache().getAll((Collection<String>)null).isEmpty();
+        assert cache().getAll(Collections.<String>emptyList()).isEmpty();
+
+        Map<String, Integer> map1 = cache().getAll(F.asList("key1", "key2", "key9999"));
+
+        info("Retrieved map1: " + map1);
+
+        assert 2 == map1.size() : "Invalid map: " + map1;
+
+        assertEquals(1, (int)map1.get("key1"));
+        assertEquals(2, (int)map1.get("key2"));
+        assertNull(map1.get("key9999"));
+
+        Map<String, Integer> map2 = cache().getAll(F.asList("key1", "key2", "key9999"));
+
+        info("Retrieved map2: " + map2);
+
+        assert 2 == map2.size() : "Invalid map: " + map2;
+
+        assertEquals(1, (int)map2.get("key1"));
+        assertEquals(2, (int)map2.get("key2"));
+        assertNull(map2.get("key9999"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
new file mode 100644
index 0000000..fd75eb8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearCacheSelfTest.java
@@ -0,0 +1,820 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.gridgain.grid.kernal.processors.cache.*;
+import org.apache.ignite.internal.processors.cache.distributed.near.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.gridgain.testframework.junits.common.*;
+import org.jetbrains.annotations.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.util.*;
+
+import static org.apache.ignite.cache.GridCacheAtomicWriteOrderMode.*;
+import static org.apache.ignite.cache.GridCacheAtomicityMode.*;
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+import static org.apache.ignite.cache.GridCacheMode.*;
+import static org.apache.ignite.cache.GridCachePreloadMode.*;
+import static org.apache.ignite.cache.GridCacheWriteSynchronizationMode.*;
+import static org.apache.ignite.internal.GridNodeAttributes.*;
+
+/**
+ * Tests near cache with various atomic cache configuration.
+ */
+public class GridCacheAtomicNearCacheSelfTest extends GridCommonAbstractTest {
+    /** */
+    private static final int GRID_CNT = 4;
+
+    /** */
+    private static final int PRIMARY = 0;
+
+    /** */
+    private static final int BACKUP = 1;
+
+    /** */
+    private static final int NOT_PRIMARY_AND_BACKUP = 2;
+
+    /** */
+    private int backups;
+
+    /** */
+    private GridCacheAtomicWriteOrderMode writeOrderMode;
+
+    /** */
+    private int lastKey;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+        ccfg.setAtomicityMode(ATOMIC);
+        ccfg.setDistributionMode(NEAR_PARTITIONED);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setPreloadMode(SYNC);
+
+        assert writeOrderMode != null;
+
+        ccfg.setAtomicWriteOrderMode(writeOrderMode);
+        ccfg.setBackups(backups);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoBackupsPrimaryWriteOrder() throws Exception {
+        startGrids(0, GridCacheAtomicWriteOrderMode.PRIMARY);
+
+        checkNearCache();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testNoBackupsClockWriteOrder() throws Exception {
+        startGrids(0, CLOCK);
+
+        checkNearCache();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWithBackupsPrimaryWriteOrder() throws Exception {
+        startGrids(2, GridCacheAtomicWriteOrderMode.PRIMARY);
+
+        checkNearCache();
+    }
+
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testWithBackupsClockWriteOrder() throws Exception {
+        startGrids(2, CLOCK);
+
+        checkNearCache();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ZeroLengthArrayAllocation")
+    private void checkNearCache() throws Exception {
+        checkPut();
+
+        checkPutAll();
+
+        checkTransform();
+
+        checkTransformAll();
+
+        checkRemove();
+
+        checkReaderEvict();
+
+        checkReaderRemove();
+
+        checkPutRemoveGet();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ZeroLengthArrayAllocation")
+    private void checkPutAll() throws Exception {
+        log.info("Check putAll.");
+
+        Ignite ignite0 = grid(0);
+
+        GridCache<Integer, Integer> cache0 = ignite0.cache(null);
+
+        GridCacheAffinity<Integer> aff = cache0.affinity();
+
+        UUID id0 = ignite0.cluster().localNode().id();
+
+        Map<Integer, Integer> primaryKeys = new HashMap<>();
+
+        for (int i = 0; i < 10; i++)
+            primaryKeys.put(key(ignite0, PRIMARY), 1);
+
+        log.info("PutAll from primary.");
+
+        cache0.putAll(primaryKeys);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            for (Integer primaryKey : primaryKeys.keySet())
+                checkEntry(grid(i), primaryKey, 1, false);
+        }
+
+        if (backups > 0) {
+            Map<Integer, Integer> backupKeys = new HashMap<>();
+
+            for (int i = 0; i < 10; i++)
+                backupKeys.put(key(ignite0, BACKUP), 2);
+
+            log.info("PutAll from backup.");
+
+            cache0.putAll(backupKeys);
+
+            for (int i = 0; i < GRID_CNT; i++) {
+                for (Integer backupKey : backupKeys.keySet())
+                    checkEntry(grid(i), backupKey, 2, false);
+            }
+        }
+
+        Map<Integer, Integer> nearKeys = new HashMap<>();
+
+        for (int i = 0; i < 30; i++)
+            nearKeys.put(key(ignite0, NOT_PRIMARY_AND_BACKUP), 3);
+
+        log.info("PutAll from near.");
+
+        cache0.putAll(nearKeys);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            for (Integer nearKey : nearKeys.keySet()) {
+                UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
+
+                checkEntry(grid(i), nearKey, 3, i == 0, expReaders);
+            }
+        }
+
+        Map<Integer, Collection<UUID>> readersMap = new HashMap<>();
+
+        for (Integer key : nearKeys.keySet())
+            readersMap.put(key, new HashSet<UUID>());
+
+        int val = 4;
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            delay();
+
+            GridCache<Integer, Integer> cache = grid(i).cache(null);
+
+            for (Integer key : nearKeys.keySet())
+                nearKeys.put(key, val);
+
+            log.info("PutAll [grid=" + grid(i).name() + ", val=" + val + ']');
+
+            cache.putAll(nearKeys);
+
+            for (Integer key : nearKeys.keySet()) {
+                if (!aff.isPrimaryOrBackup(grid(i).localNode(), key))
+                    readersMap.get(key).add(grid(i).localNode().id());
+            }
+
+            for (int j = 0; j < GRID_CNT; j++) {
+                for (Integer key : nearKeys.keySet()) {
+                    boolean primaryNode = aff.isPrimary(grid(j).localNode(), key);
+
+                    Collection<UUID> readers = readersMap.get(key);
+
+                    UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[]{};
+
+                    checkEntry(grid(j), key, val, readers.contains(grid(j).localNode().id()), expReaders);
+                }
+            }
+
+            val++;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ZeroLengthArrayAllocation")
+    private void checkTransform() throws Exception {
+        log.info("Check transform.");
+
+        Ignite ignite0 = grid(0);
+
+        IgniteCache<Integer, Integer> cache0 = ignite0.jcache(null);
+
+        GridCacheAffinity<Object> aff = cache(0).affinity();
+
+        UUID id0 = ignite0.cluster().localNode().id();
+
+        Integer primaryKey = key(ignite0, PRIMARY);
+
+        log.info("Transform from primary.");
+
+        cache0.invoke(primaryKey, new Processor(primaryKey));
+
+        for (int i = 0; i < GRID_CNT; i++)
+            checkEntry(grid(i), primaryKey, primaryKey, false);
+
+        if (backups > 0) {
+            Integer backupKey = key(ignite0, BACKUP);
+
+            log.info("Transform from backup.");
+
+            cache0.invoke(backupKey, new Processor(backupKey));
+
+            for (int i = 0; i < GRID_CNT; i++)
+                checkEntry(grid(i), backupKey, backupKey, false);
+        }
+
+        Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP);
+
+        log.info("Transform from near.");
+
+        cache0.invoke(nearKey, new Processor(nearKey));
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
+
+            checkEntry(grid(i), nearKey, nearKey, i == 0, expReaders);
+        }
+
+        Collection<UUID> readers = new HashSet<>();
+
+        readers.add(id0);
+
+        int val = nearKey + 1;
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            delay();
+
+            IgniteCache<Integer, Integer> cache = grid(i).jcache(null);
+
+            log.info("Transform [grid=" + grid(i).name() + ", val=" + val + ']');
+
+            cache.invoke(nearKey, new Processor(val));
+
+            if (!aff.isPrimaryOrBackup(grid(i).localNode(), nearKey))
+                readers.add(grid(i).localNode().id());
+
+            for (int j = 0; j < GRID_CNT; j++) {
+                boolean primaryNode = aff.isPrimary(grid(j).localNode(), nearKey);
+
+                UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[]{};
+
+                checkEntry(grid(j), nearKey, val, readers.contains(grid(j).localNode().id()), expReaders);
+            }
+
+            val++;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ZeroLengthArrayAllocation")
+    private void checkTransformAll() throws Exception {
+        log.info("Check transformAll.");
+
+        Ignite ignite0 = grid(0);
+
+        IgniteCache<Integer, Integer> cache0 = ignite0.jcache(null);
+
+        GridCacheAffinity<Object> aff = ignite0.cache(null).affinity();
+
+        UUID id0 = ignite0.cluster().localNode().id();
+
+        Set<Integer> primaryKeys = new HashSet<>();
+
+        for (int i = 0; i < 10; i++)
+            primaryKeys.add(key(ignite0, PRIMARY));
+
+        log.info("TransformAll from primary.");
+
+        cache0.invokeAll(primaryKeys, new Processor(1));
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            for (Integer primaryKey : primaryKeys)
+                checkEntry(grid(i), primaryKey, 1, false);
+        }
+
+        if (backups > 0) {
+            Set<Integer> backupKeys = new HashSet<>();
+
+            for (int i = 0; i < 10; i++)
+                backupKeys.add(key(ignite0, BACKUP));
+
+            log.info("TransformAll from backup.");
+
+            cache0.invokeAll(backupKeys, new Processor(2));
+
+            for (int i = 0; i < GRID_CNT; i++) {
+                for (Integer backupKey : backupKeys)
+                    checkEntry(grid(i), backupKey, 2, false);
+            }
+        }
+
+        Set<Integer> nearKeys = new HashSet<>();
+
+        for (int i = 0; i < 30; i++)
+            nearKeys.add(key(ignite0, NOT_PRIMARY_AND_BACKUP));
+
+        log.info("TransformAll from near.");
+
+        cache0.invokeAll(nearKeys, new Processor(3));
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            for (Integer nearKey : nearKeys) {
+                UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
+
+                checkEntry(grid(i), nearKey, 3, i == 0, expReaders);
+            }
+        }
+
+        Map<Integer, Collection<UUID>> readersMap = new HashMap<>();
+
+        for (Integer key : nearKeys)
+            readersMap.put(key, new HashSet<UUID>());
+
+        int val = 4;
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            delay();
+
+            IgniteCache<Integer, Integer> cache = grid(i).jcache(null);
+
+            for (Integer key : nearKeys)
+                nearKeys.add(key);
+
+            log.info("TransformAll [grid=" + grid(i).name() + ", val=" + val + ']');
+
+            cache.invokeAll(nearKeys, new Processor(val));
+
+            for (Integer key : nearKeys) {
+                if (!aff.isPrimaryOrBackup(grid(i).localNode(), key))
+                    readersMap.get(key).add(grid(i).localNode().id());
+            }
+
+            for (int j = 0; j < GRID_CNT; j++) {
+                for (Integer key : nearKeys) {
+                    boolean primaryNode = aff.isPrimary(grid(j).localNode(), key);
+
+                    Collection<UUID> readers = readersMap.get(key);
+
+                    UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[]{};
+
+                    checkEntry(grid(j), key, val, readers.contains(grid(j).localNode().id()), expReaders);
+                }
+            }
+
+            val++;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkPutRemoveGet() throws Exception {
+        Ignite ignite0 = grid(0);
+
+        GridCache<Integer, Integer> cache0 = ignite0.cache(null);
+
+        Integer key = key(ignite0, NOT_PRIMARY_AND_BACKUP);
+
+        cache0.put(key, key);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            grid(i).cache(null).get(key);
+
+        cache0.remove(key);
+
+        cache0.put(key, key);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            grid(i).cache(null).get(key);
+
+        cache0.remove(key);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    private void checkPut() throws Exception {
+        checkPut(0);
+
+        checkPut(GRID_CNT - 1);
+    }
+
+    /**
+     * @param grid Grid Index.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ZeroLengthArrayAllocation")
+    private void checkPut(int grid) throws Exception {
+        log.info("Check put, grid: " + grid);
+
+        Ignite ignite0 = grid(grid);
+
+        GridCache<Integer, Integer> cache0 = ignite0.cache(null);
+
+        GridCacheAffinity<Integer> aff = cache0.affinity();
+
+        UUID id0 = ignite0.cluster().localNode().id();
+
+        Integer primaryKey = key(ignite0, PRIMARY);
+
+        log.info("Put from primary.");
+
+        cache0.put(primaryKey, primaryKey);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            checkEntry(grid(i), primaryKey, primaryKey, false);
+
+        if (backups > 0) {
+            Integer backupKey = key(ignite0, BACKUP);
+
+            log.info("Put from backup.");
+
+            cache0.put(backupKey, backupKey);
+
+            for (int i = 0; i < GRID_CNT; i++)
+                checkEntry(grid(i), backupKey, backupKey, false);
+        }
+
+        Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP);
+
+        log.info("Put from near.");
+
+        cache0.put(nearKey, nearKey);
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
+
+            checkEntry(grid(i), nearKey, nearKey, i == grid, expReaders);
+        }
+
+        Collection<UUID> readers = new HashSet<>();
+
+        readers.add(id0);
+
+        int val = nearKey + 1;
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            delay();
+
+            GridCache<Integer, Integer> cache = grid(i).cache(null);
+
+            log.info("Put [grid=" + grid(i).name() + ", val=" + val + ']');
+
+            cache.put(nearKey, val);
+
+            if (!aff.isPrimaryOrBackup(grid(i).localNode(), nearKey))
+                readers.add(grid(i).localNode().id());
+
+            for (int j = 0; j < GRID_CNT; j++) {
+                boolean primaryNode = aff.isPrimary(grid(j).localNode(), nearKey);
+
+                UUID[] expReaders = primaryNode ? U.toArray(readers, new UUID[readers.size()]) : new UUID[]{};
+
+                checkEntry(grid(j), nearKey, val, readers.contains(grid(j).localNode().id()), expReaders);
+            }
+
+            val++;
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ZeroLengthArrayAllocation")
+    private void checkRemove() throws Exception {
+        log.info("Check remove.");
+
+        Ignite ignite0 = grid(0);
+
+        GridCache<Integer, Integer> cache0 = ignite0.cache(null);
+
+        Integer primaryKey = key(ignite0, PRIMARY);
+
+        log.info("Put from primary.");
+
+        cache0.put(primaryKey, primaryKey);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            checkEntry(grid(i), primaryKey, primaryKey, false);
+
+        log.info("Remove from primary.");
+
+        cache0.remove(primaryKey);
+
+        for (int i = 0; i < GRID_CNT; i++)
+            checkEntry(grid(i), primaryKey, null, false);
+
+        if (backups > 0) {
+            Integer backupKey = key(ignite0, BACKUP);
+
+            log.info("Put from backup.");
+
+            cache0.put(backupKey, backupKey);
+
+            for (int i = 0; i < GRID_CNT; i++)
+                checkEntry(grid(i), backupKey, backupKey, false);
+
+            log.info("Remove from backup.");
+
+            cache0.remove(backupKey);
+
+            for (int i = 0; i < GRID_CNT; i++)
+                checkEntry(grid(i), backupKey, null, false);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ZeroLengthArrayAllocation")
+    private void checkReaderEvict() throws Exception {
+        log.info("Check evict.");
+
+        Ignite ignite0 = grid(0);
+
+        GridCache<Integer, Integer> cache0 = ignite0.cache(null);
+
+        GridCacheAffinity<Integer> aff = cache0.affinity();
+
+        UUID id0 = ignite0.cluster().localNode().id();
+
+        Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP);
+
+        cache0.put(nearKey, 1); // Put should create near entry on grid0.
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
+
+            checkEntry(grid(i), nearKey, 1, i == 0, expReaders);
+        }
+
+        cache0.evict(nearKey); // Remove near entry on grid0.
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
+
+            checkEntry(grid(i), nearKey, 1, false, expReaders);
+        }
+
+        GridCache<Integer, Integer> primaryCache = G.ignite(
+            (String) aff.mapKeyToNode(nearKey).attribute(ATTR_GRID_NAME)).cache(null);
+
+        delay();
+
+        primaryCache.put(nearKey, 2); // This put should see that near entry evicted on grid0 and remove reader.
+
+        for (int i = 0; i < GRID_CNT; i++)
+            checkEntry(grid(i), nearKey, 2, false);
+
+        assertEquals((Integer)2, cache0.get(nearKey)); // Get should again create near entry on grid0.
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
+
+            checkEntry(grid(i), nearKey, 2, i == 0, expReaders);
+        }
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ZeroLengthArrayAllocation")
+    private void checkReaderRemove() throws Exception {
+        Ignite ignite0 = grid(0);
+
+        GridCache<Integer, Integer> cache0 = ignite0.cache(null);
+
+        GridCacheAffinity<Integer> aff = cache0.affinity();
+
+        UUID id0 = ignite0.cluster().localNode().id();
+
+        Integer nearKey = key(ignite0, NOT_PRIMARY_AND_BACKUP);
+
+        cache0.put(nearKey, 1); // Put should create near entry on grid0.
+
+        for (int i = 0; i < GRID_CNT; i++) {
+            UUID[] expReaders = aff.isPrimary(grid(i).localNode(), nearKey) ? new UUID[]{id0} : new UUID[]{};
+
+            checkEntry(grid(i), nearKey, 1, i == 0, expReaders);
+        }
+
+        cache0.remove(nearKey); // Remove from grid0, this should remove readers on primary node.
+
+        for (int i = 0; i < GRID_CNT; i++)
+            checkEntry(grid(i), nearKey, null, i == 0);
+
+        Ignite primaryNode = G.ignite((String) aff.mapKeyToNode(nearKey).attribute(ATTR_GRID_NAME));
+
+        delay();
+
+        GridCache<Integer, Integer> primaryCache = primaryNode.cache(null);
+
+        primaryCache.put(nearKey, 2); // Put from primary, check there are no readers.
+
+        checkEntry(primaryNode, nearKey, 2, false);
+    }
+
+    /**
+     * @param ignite Node.
+     * @param key Key.
+     * @param val Expected value.
+     * @param expectNear If {@code true} then near cache entry is expected.
+     * @param expReaders Expected readers.
+     * @throws Exception If failed.
+     */
+    @SuppressWarnings("ConstantConditions")
+    private void checkEntry(Ignite ignite, Integer key, @Nullable Integer val, boolean expectNear, UUID... expReaders)
+        throws Exception {
+        GridCacheAdapter<Integer, Integer> near = ((GridKernal) ignite).internalCache();
+
+        assertTrue(near.isNear());
+
+        GridCacheEntryEx<Integer, Integer> nearEntry = near.peekEx(key);
+
+        if (expectNear) {
+            assertNotNull("No near entry for: " + key + ", grid: " + ignite.name(), nearEntry);
+
+            assertEquals("Unexpected value for grid: " + ignite.name(), val, nearEntry.info().value());
+        }
+        else
+            assertNull("Unexpected near entry: " + nearEntry + ", grid: " + ignite.name(), nearEntry);
+
+        GridDhtCacheAdapter<Integer, Integer> dht = ((GridNearCacheAdapter<Integer, Integer>)near).dht();
+
+        GridDhtCacheEntry<Integer, Integer> dhtEntry = (GridDhtCacheEntry<Integer, Integer>)dht.peekEx(key);
+
+        boolean expectDht = near.affinity().isPrimaryOrBackup(ignite.cluster().localNode(), key);
+
+        if (expectDht) {
+            assertNotNull("No dht entry for: " + key + ", grid: " + ignite.name(), dhtEntry);
+
+            Collection<UUID> readers = dhtEntry.readers();
+
+            assertEquals(expReaders.length, readers.size());
+
+            for (UUID reader : expReaders)
+                assertTrue(readers.contains(reader));
+
+            assertEquals("Unexpected value for grid: " + ignite.name(), val, dhtEntry.info().value());
+        }
+        else
+            assertNull("Unexpected dht entry: " + dhtEntry + ", grid: " + ignite.name(), dhtEntry);
+    }
+
+    /**
+     * @param ignite Grid.
+     * @param mode One of {@link #PRIMARY}, {@link #BACKUP} or {@link #NOT_PRIMARY_AND_BACKUP}.
+     * @return Key with properties specified by the given mode.
+     */
+    private Integer key(Ignite ignite, int mode) {
+        GridCache<Integer, Integer> cache = ignite.cache(null);
+
+        GridCacheAffinity<Integer> aff = cache.affinity();
+
+        Integer key = null;
+
+        for (int i = lastKey + 1; i < 1_000_000; i++) {
+            boolean pass = false;
+
+            switch(mode) {
+                case PRIMARY: pass = aff.isPrimary(ignite.cluster().localNode(), i); break;
+
+                case BACKUP: pass = aff.isBackup(ignite.cluster().localNode(), i); break;
+
+                case NOT_PRIMARY_AND_BACKUP: pass = !aff.isPrimaryOrBackup(ignite.cluster().localNode(), i); break;
+
+                default: fail();
+            }
+
+            lastKey = i;
+
+            if (pass) {
+                key = i;
+
+                break;
+            }
+        }
+
+        assertNotNull(key);
+
+        return key;
+    }
+
+    /**
+     * @throws IgniteCheckedException If failed.
+     */
+    private void delay() throws IgniteCheckedException {
+        if (writeOrderMode == CLOCK)
+            U.sleep(100);
+    }
+
+    /**
+     * @param backups Backups number.
+     * @param writeOrderMode Write order mode.
+     * @throws Exception If failed.
+     */
+    private void startGrids(int backups, GridCacheAtomicWriteOrderMode writeOrderMode) throws Exception {
+        this.backups = backups;
+
+        this.writeOrderMode = writeOrderMode;
+
+        startGrids(GRID_CNT);
+
+        awaitPartitionMapExchange();
+
+        log.info("Grids: ");
+
+        for (int i = 0; i < GRID_CNT; i++)
+            log.info(grid(i).name() + ": " + grid(i).localNode().id());
+    }
+
+    /**
+     *
+     */
+    private static class Processor implements EntryProcessor<Integer, Integer, Void>, Serializable {
+        /** */
+        private final Integer newVal;
+
+        /**
+         * @param newVal New value.
+         */
+        private Processor(Integer newVal) {
+            this.newVal = newVal;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<Integer, Integer> e, Object... args) {
+            e.setValue(newVal);
+
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b77f2a59/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearEnabledFullApiSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearEnabledFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearEnabledFullApiSelfTest.java
new file mode 100644
index 0000000..b835845
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheAtomicNearEnabledFullApiSelfTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+import org.apache.ignite.cache.*;
+
+import static org.apache.ignite.cache.GridCacheDistributionMode.*;
+
+/**
+ * Tests atomic cache with near cache enabled.
+ */
+public class GridCacheAtomicNearEnabledFullApiSelfTest extends GridCacheAtomicFullApiSelfTest {
+    /** {@inheritDoc} */
+    @Override protected GridCacheDistributionMode distributionMode() {
+        return NEAR_PARTITIONED;
+    }
+}


Mime
View raw message