ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [26/43] ignite git commit: IGNITE-2864 Need update local store from primary and backups
Date Thu, 19 May 2016 09:37:59 GMT
 IGNITE-2864  Need update local store from primary and backups


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9f2bafbc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9f2bafbc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9f2bafbc

Branch: refs/heads/ignite-3163
Commit: 9f2bafbcad81c169f33d04c8f04ee4c2fe4f1d36
Parents: 7f889ee
Author: Anton Vinogradov <av@apache.org>
Authored: Fri May 13 17:30:43 2016 +0300
Committer: Anton Vinogradov <av@apache.org>
Committed: Fri May 13 17:30:43 2016 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   3 +
 .../cache/GridCacheSharedContext.java           |  34 ++
 .../GridDistributedTxRemoteAdapter.java         |   7 +
 .../dht/atomic/GridDhtAtomicCache.java          |  17 +-
 .../dht/atomic/GridDhtAtomicUpdateFuture.java   |   6 +-
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  | 109 +++--
 .../store/GridCacheStoreManagerAdapter.java     |  29 +-
 .../cache/transactions/IgniteInternalTx.java    |   5 +
 .../cache/transactions/IgniteTxAdapter.java     | 252 ++++++++++
 .../IgniteTxImplicitSingleStateImpl.java        |   9 +-
 .../transactions/IgniteTxLocalAdapter.java      | 237 ---------
 .../cache/transactions/IgniteTxLocalEx.java     |   5 -
 .../IgniteTxRemoteSingleStateImpl.java          |  18 +-
 .../IgniteTxRemoteStateAdapter.java             |   9 -
 .../transactions/IgniteTxRemoteStateImpl.java   |  34 +-
 .../cache/transactions/IgniteTxStateImpl.java   |   2 +-
 .../CacheStoreUsageMultinodeAbstractTest.java   |   5 +-
 .../GridCacheAbstractLocalStoreSelfTest.java    | 478 +++++++++++++++++--
 .../GridCachePartitionedLocalStoreSelfTest.java |   6 -
 ...chePartitionedOffHeapLocalStoreSelfTest.java |   6 -
 .../GridCacheReplicatedLocalStoreSelfTest.java  |   6 -
 ...ridCacheTxPartitionedLocalStoreSelfTest.java |   6 -
 22 files changed, 912 insertions(+), 371 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 2b643b2..1af8b6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -381,6 +381,9 @@ public final class IgniteSystemProperties {
     /** Maximum number of nested listener calls before listener notification becomes asynchronous. */
     public static final String IGNITE_MAX_NESTED_LISTENER_CALLS = "IGNITE_MAX_NESTED_LISTENER_CALLS";
 
+    /** Indicating whether local store keeps primary only. Backward compatibility flag. */
+    public static final String IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY = "IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY";
+
     /**
      * Manages {@link OptimizedMarshaller} behavior of {@code serialVersionUID} computation for
      * {@link Serializable} classes.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index ef271f1..341f610 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -24,8 +24,10 @@ import java.util.ListIterator;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.store.CacheStoreSessionListener;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -53,6 +55,8 @@ import org.apache.ignite.lang.IgniteFuture;
 import org.apache.ignite.marshaller.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY;
+
 /**
  * Shared context.
  */
@@ -94,6 +98,12 @@ public class GridCacheSharedContext<K, V> {
     /** Store session listeners. */
     private Collection<CacheStoreSessionListener> storeSesLsnrs;
 
+    /** Local store count. */
+    private final AtomicInteger locStoreCnt;
+
+    /** Indicating whether local store keeps primary only. */
+    private final boolean locStorePrimaryOnly = IgniteSystemProperties.getBoolean(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY);
+
     /**
      * @param kernalCtx  Context.
      * @param txMgr Transaction manager.
@@ -125,6 +135,8 @@ public class GridCacheSharedContext<K, V> {
         txMetrics = new TransactionMetricsAdapter();
 
         ctxMap = new ConcurrentHashMap<>();
+
+        locStoreCnt = new AtomicInteger();
     }
 
     /**
@@ -242,6 +254,11 @@ public class GridCacheSharedContext<K, V> {
                 ", conflictingCacheName=" + existing.name() + ']');
         }
 
+        CacheStoreManager mgr = cacheCtx.store();
+
+        if (mgr.configured() && mgr.isLocal())
+            locStoreCnt.incrementAndGet();
+
         ctxMap.put(cacheCtx.cacheId(), cacheCtx);
     }
 
@@ -253,6 +270,11 @@ public class GridCacheSharedContext<K, V> {
 
         ctxMap.remove(cacheId, cacheCtx);
 
+        CacheStoreManager mgr = cacheCtx.store();
+
+        if (mgr.configured() && mgr.isLocal())
+            locStoreCnt.decrementAndGet();
+
         // Safely clean up the message listeners.
         ioMgr.removeHandlers(cacheId);
     }
@@ -464,6 +486,13 @@ public class GridCacheSharedContext<K, V> {
     }
 
     /**
+     * @return Count of caches with configured local stores.
+     */
+    public int getLocalStoreCount() {
+        return locStoreCnt.get();
+    }
+
+    /**
      * @param nodeId Node ID.
      * @return Node or {@code null}.
      */
@@ -471,6 +500,11 @@ public class GridCacheSharedContext<K, V> {
         return kernalCtx.discovery().node(nodeId);
     }
 
+    /** Indicating whether local store keeps primary only. */
+    public boolean localStorePrimaryOnly() {
+        return locStorePrimaryOnly;
+    }
+
     /**
      * Gets grid logger for given class.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
index 1fd0b2e..51be6e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java
@@ -452,6 +452,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
 
                     AffinityTopologyVersion topVer = topologyVersion();
 
+                    batchStoreCommit(writeMap().values());
+
                     // Node that for near transactions we grab all entries.
                     for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) {
                         GridCacheContext cacheCtx = txEntry.context();
@@ -770,6 +772,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter
         return explicitVers == null ? Collections.<GridCacheVersion>emptyList() : explicitVers;
     }
 
+    /** {@inheritDoc} */
+    @Override public void commitError(Throwable e) {
+        // No-op.
+    }
+
     /**
      * Adds explicit version if there is one.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 25e4e3f..6aad533 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -144,7 +144,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /** Update reply closure. */
     private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos;
 
-    /** Pending  */
+    /** Pending */
     private ConcurrentMap<UUID, DeferredResponseBuffer> pendingResponses = new ConcurrentHashMap8<>();
 
     /** */
@@ -1414,10 +1414,11 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
 
                         GridCacheReturn retVal = null;
 
-                        if (keys.size() > 1 &&                             // Several keys ...
-                            writeThrough() && !req.skipStore() &&          // and store is enabled ...
-                            !ctx.store().isLocal() &&                      // and this is not local store ...
-                            !ctx.dr().receiveEnabled()                     // and no DR.
+                        if (keys.size() > 1 &&                    // Several keys ...
+                            writeThrough() && !req.skipStore() && // and store is enabled ...
+                            !ctx.store().isLocal() &&             // and this is not local store ...
+                                                                  // (conflict resolver should be used for local store)
+                            !ctx.dr().receiveEnabled()            // and no DR.
                             ) {
                             // This method can only be used when there are no replicated entries in the batch.
                             UpdateBatchResult updRes = updateWithBatch(node,
@@ -2043,7 +2044,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                     op,
                     writeVal,
                     req.invokeArguments(),
-                    primary && writeThrough() && !req.skipStore(),
+                    (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()))
+                        && writeThrough() && !req.skipStore(),
                     !req.skipStore(),
                     sndPrevVal || req.returnValue(),
                     req.keepBinary(),
@@ -2801,7 +2803,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
                             op,
                             op == TRANSFORM ? entryProcessor : val,
                             op == TRANSFORM ? req.invokeArguments() : null,
-                            /*write-through*/false,
+                            /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly())
+                                && writeThrough() && !req.skipStore(),
                             /*read-through*/false,
                             /*retval*/false,
                             req.keepBinary(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
index 47f6cb2..df44455 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java
@@ -262,7 +262,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                         this.updateReq.taskNameHash(),
                         forceTransformBackups ? this.updateReq.invokeArguments() : null,
                         cctx.deploymentEnabled(),
-                        this.updateReq.keepBinary());
+                        this.updateReq.keepBinary(),
+                        this.updateReq.skipStore());
 
                     mappings.put(nodeId, updateReq);
                 }
@@ -323,7 +324,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void>
                     this.updateReq.taskNameHash(),
                     forceTransformBackups ? this.updateReq.invokeArguments() : null,
                     cctx.deploymentEnabled(),
-                    this.updateReq.keepBinary());
+                    this.updateReq.keepBinary(),
+                    this.updateReq.skipStore());
 
                 mappings.put(nodeId, updateReq);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index 126cd83..ed50f41 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -46,6 +46,8 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK;
+
 /**
  * Lite dht cache backup update request.
  */
@@ -57,6 +59,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     public static final int CACHE_MSG_IDX = nextIndexId();
 
     /** Node ID. */
+    @GridDirectTransient
     private UUID nodeId;
 
     /** Future version. */
@@ -159,6 +162,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     private boolean keepBinary;
 
     /**
+     * Additional flags.
+     */
+    private byte flags;
+
+    /**
      * Empty constructor required by {@link Externalizable}.
      */
     public GridDhtAtomicUpdateRequest() {
@@ -192,7 +200,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         int taskNameHash,
         Object[] invokeArgs,
         boolean addDepInfo,
-        boolean keepBinary
+        boolean keepBinary,
+        boolean skipStore
     ) {
         assert invokeArgs == null || forceTransformBackups;
 
@@ -209,6 +218,9 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
         this.addDepInfo = addDepInfo;
         this.keepBinary = keepBinary;
 
+        if (skipStore)
+            flags = (byte)(flags | SKIP_STORE_FLAG_MASK);
+
         keys = new ArrayList<>();
         partIds = new ArrayList<>();
 
@@ -465,6 +477,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
     }
 
     /**
+     * @return Skip write-through to a persistent storage.
+     */
+    public boolean skipStore() {
+        return (flags & SKIP_STORE_FLAG_MASK) == SKIP_STORE_FLAG_MASK;
+    }
+
+    /**
      * @param updCntr Update counter.
      * @return Update counter.
      */
@@ -717,114 +736,120 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
+                if (!writer.writeByte("flags", flags))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeMessage("futVer", futVer))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeBoolean("keepBinary", keepBinary))
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
+                if (!writer.writeBoolean("keepBinary", keepBinary))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
+                if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearExpireTimes", nearExpireTimes))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeMessage("nearTtls", nearTtls))
+                if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("nearTtls", nearTtls))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
+                if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeMessage("ttls", ttls))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 22:
-                if (!writer.writeMessage("updateCntrs", updateCntrs))
+                if (!writer.writeMessage("ttls", ttls))
                     return false;
 
                 writer.incrementState();
 
             case 23:
-                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                if (!writer.writeMessage("updateCntrs", updateCntrs))
                     return false;
 
                 writer.incrementState();
 
             case 24:
+                if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG))
+                    return false;
+
+                writer.incrementState();
+
+            case 25:
                 if (!writer.writeMessage("writeVer", writeVer))
                     return false;
 
@@ -871,7 +896,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 6:
-                forceTransformBackups = reader.readBoolean("forceTransformBackups");
+                flags = reader.readByte("flags");
 
                 if (!reader.isLastRead())
                     return false;
@@ -879,7 +904,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 7:
-                futVer = reader.readMessage("futVer");
+                forceTransformBackups = reader.readBoolean("forceTransformBackups");
 
                 if (!reader.isLastRead())
                     return false;
@@ -887,7 +912,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 8:
-                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
+                futVer = reader.readMessage("futVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -895,7 +920,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 9:
-                keepBinary = reader.readBoolean("keepBinary");
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -903,7 +928,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 10:
-                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
+                keepBinary = reader.readBoolean("keepBinary");
 
                 if (!reader.isLastRead())
                     return false;
@@ -911,7 +936,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 11:
-                nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
+                keys = reader.readCollection("keys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -919,7 +944,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 12:
-                nearExpireTimes = reader.readMessage("nearExpireTimes");
+                nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR);
 
                 if (!reader.isLastRead())
                     return false;
@@ -927,7 +952,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 13:
-                nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
+                nearExpireTimes = reader.readMessage("nearExpireTimes");
 
                 if (!reader.isLastRead())
                     return false;
@@ -935,7 +960,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 14:
-                nearTtls = reader.readMessage("nearTtls");
+                nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -943,7 +968,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 15:
-                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
+                nearTtls = reader.readMessage("nearTtls");
 
                 if (!reader.isLastRead())
                     return false;
@@ -951,7 +976,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 16:
-                prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
+                nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -959,7 +984,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 17:
-                subjId = reader.readUuid("subjId");
+                prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
                     return false;
@@ -967,6 +992,14 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
                 reader.incrementState();
 
             case 18:
+                subjId = reader.readUuid("subjId");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 19:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -978,7 +1011,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 19:
+            case 20:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -986,7 +1019,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -994,7 +1027,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
                 ttls = reader.readMessage("ttls");
 
                 if (!reader.isLastRead())
@@ -1002,7 +1035,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 22:
+            case 23:
                 updateCntrs = reader.readMessage("updateCntrs");
 
                 if (!reader.isLastRead())
@@ -1010,7 +1043,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 23:
+            case 24:
                 vals = reader.readCollection("vals", MessageCollectionItemType.MSG);
 
                 if (!reader.isLastRead())
@@ -1018,7 +1051,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
                 reader.incrementState();
 
-            case 24:
+            case 25:
                 writeVer = reader.readMessage("writeVer");
 
                 if (!reader.isLastRead())
@@ -1051,7 +1084,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 25;
+        return 26;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
index 7ffebbd..18d9c7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/store/GridCacheStoreManagerAdapter.java
@@ -31,9 +31,11 @@ import javax.cache.Cache;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.integration.CacheWriterException;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteSystemProperties;
 import org.apache.ignite.cache.store.CacheStore;
 import org.apache.ignite.cache.store.CacheStoreSession;
 import org.apache.ignite.cache.store.CacheStoreSessionListener;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
@@ -60,12 +62,14 @@ import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.lifecycle.LifecycleAware;
 import org.apache.ignite.transactions.Transaction;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY;
+
 /**
  * Store manager.
  */
@@ -74,6 +78,13 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
     /** */
     private static final int SES_ATTR = GridMetadataAwareAdapter.EntryKey.CACHE_STORE_MANAGER_KEY.key();
 
+    /**
+     * Behavior can be changed by setting {@link IgniteSystemProperties#IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY} property
+     * to {@code True}.
+     */
+    private static final IgniteProductVersion LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE =
+        IgniteProductVersion.fromString("1.5.22");
+
     /** */
     protected CacheStore<Object, Object> store;
 
@@ -217,6 +228,22 @@ public abstract class GridCacheStoreManagerAdapter extends GridCacheManagerAdapt
 
             globalSesLsnrs = true;
         }
+
+        if (isLocal()) {
+            for (ClusterNode node : cctx.kernalContext().cluster().get().forRemotes().nodes()) {
+                if (LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE.compareTo(node.version()) > 0 &&
+                    !IgniteSystemProperties.getBoolean(IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY)) {
+                    IgniteProductVersion v = LOCAL_STORE_KEEPS_PRIMARY_AND_BACKUPS_SINCE;
+
+                    log.warning("Since Ignite " + v.major() + "." + v.minor() + "." + v.maintenance() +
+                        " Local Store keeps primary and backup partitions. " +
+                        "To keep primary partitions only please set system property " +
+                        IGNITE_LOCAL_STORE_KEEPS_PRIMARY_ONLY + " to 'true'.");
+
+                    break;
+                }
+            }
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index f5f99f5..07b21ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -719,4 +719,9 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject {
      * @param topVer New topology version.
      */
     public void onRemap(AffinityTopologyVersion topVer);
+
+    /**
+     * @param e Commit error.
+     */
+    public void commitError(Throwable e);
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index 3286689..66d4df0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -22,10 +22,13 @@ import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.io.ObjectStreamException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -40,6 +43,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
+import org.apache.ignite.internal.processors.cache.CacheLazyEntry;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -49,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry;
+import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.processors.cache.version.GridCachePlainVersionedEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
@@ -1211,6 +1216,248 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
     }
 
     /**
+     * @param stores Store managers.
+     * @return If {@code isWriteToStoreFromDht} value same for all stores.
+     */
+    protected boolean isWriteToStoreFromDhtValid(Collection<CacheStoreManager> stores) {
+        if (stores != null && !stores.isEmpty()) {
+            boolean exp = F.first(stores).isWriteToStoreFromDht();
+
+            for (CacheStoreManager store : stores) {
+                if (store.isWriteToStoreFromDht() != exp)
+                    return false;
+            }
+        }
+
+        return true;
+    }
+
+    /**
+     * @param stores Store managers.
+     * @param commit Commit flag.
+     * @throws IgniteCheckedException In case of error.
+     */
+    protected void sessionEnd(Collection<CacheStoreManager> stores, boolean commit) throws IgniteCheckedException {
+        Iterator<CacheStoreManager> it = stores.iterator();
+
+        while (it.hasNext()) {
+            CacheStoreManager store = it.next();
+
+            store.sessionEnd(this, commit, !it.hasNext());
+        }
+    }
+
+    /**
+     * Performs batch database operations. This commit must be called
+     * before cache update. This way if there is a DB failure,
+     * cache transaction can still be rolled back.
+     *
+     * @param writeEntries Transaction write set.
+     * @throws IgniteCheckedException If batch update failed.
+     */
+    @SuppressWarnings({"CatchGenericClass"})
+    protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
+        if (!storeEnabled() || internal() ||
+            (!local() && near())) // No need to work with local store at GridNearTxRemote.
+            return;
+
+        Collection<CacheStoreManager> stores = txState().stores(cctx);
+
+        if (stores == null || stores.isEmpty())
+            return;
+
+        assert isWriteToStoreFromDhtValid(stores) : "isWriteToStoreFromDht can't be different within one transaction";
+
+        CacheStoreManager first = F.first(stores);
+
+        boolean isWriteToStoreFromDht = first.isWriteToStoreFromDht();
+
+        if ((local() || first.isLocal()) && (near() || isWriteToStoreFromDht)) {
+            try {
+                if (writeEntries != null) {
+                    Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null;
+                    List<Object> rmvCol = null;
+                    CacheStoreManager writeStore = null;
+
+                    boolean skipNonPrimary = near() && isWriteToStoreFromDht;
+
+                    for (IgniteTxEntry e : writeEntries) {
+                        boolean skip = e.skipStore();
+
+                        if (!skip && skipNonPrimary) {
+                            skip = e.cached().isNear() ||
+                                e.cached().detached() ||
+                                !e.context().affinity().primary(e.cached().partition(), topologyVersion()).isLocal();
+                        }
+
+                        if (!skip && !local() && // Update local store at backups only if needed.
+                            cctx.localStorePrimaryOnly())
+                            skip = true;
+
+                        if (skip)
+                            continue;
+
+                        boolean intercept = e.context().config().getInterceptor() != null;
+
+                        if (intercept || !F.isEmpty(e.entryProcessors()))
+                            e.cached().unswap(false);
+
+                        IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false);
+
+                        GridCacheContext cacheCtx = e.context();
+
+                        GridCacheOperation op = res.get1();
+                        KeyCacheObject key = e.key();
+                        CacheObject val = res.get2();
+                        GridCacheVersion ver = writeVersion();
+
+                        if (op == CREATE || op == UPDATE) {
+                            // Batch-process all removes if needed.
+                            if (rmvCol != null && !rmvCol.isEmpty()) {
+                                assert writeStore != null;
+
+                                writeStore.removeAll(this, rmvCol);
+
+                                // Reset.
+                                rmvCol.clear();
+
+                                writeStore = null;
+                            }
+
+                            // Batch-process puts if cache ID has changed.
+                            if (writeStore != null && writeStore != cacheCtx.store()) {
+                                if (putMap != null && !putMap.isEmpty()) {
+                                    writeStore.putAll(this, putMap);
+
+                                    // Reset.
+                                    putMap.clear();
+                                }
+
+                                writeStore = null;
+                            }
+
+                            if (intercept) {
+                                Object interceptorVal = cacheCtx.config().getInterceptor().onBeforePut(
+                                    new CacheLazyEntry(
+                                        cacheCtx,
+                                        key,
+                                        e.cached().rawGetOrUnmarshal(true),
+                                        e.keepBinary()),
+                                    cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(val, e.keepBinary(), false));
+
+                                if (interceptorVal == null)
+                                    continue;
+
+                                val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal));
+                            }
+
+                            if (writeStore == null)
+                                writeStore = cacheCtx.store();
+
+                            if (writeStore.isWriteThrough()) {
+                                if (putMap == null)
+                                    putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
+
+                                putMap.put(key, F.<Object, GridCacheVersion>t(val, ver));
+                            }
+                        }
+                        else if (op == DELETE) {
+                            // Batch-process all puts if needed.
+                            if (putMap != null && !putMap.isEmpty()) {
+                                assert writeStore != null;
+
+                                writeStore.putAll(this, putMap);
+
+                                // Reset.
+                                putMap.clear();
+
+                                writeStore = null;
+                            }
+
+                            if (writeStore != null && writeStore != cacheCtx.store()) {
+                                if (rmvCol != null && !rmvCol.isEmpty()) {
+                                    writeStore.removeAll(this, rmvCol);
+
+                                    // Reset.
+                                    rmvCol.clear();
+                                }
+
+                                writeStore = null;
+                            }
+
+                            if (intercept) {
+                                IgniteBiTuple<Boolean, Object> t = cacheCtx.config().getInterceptor().onBeforeRemove(
+                                    new CacheLazyEntry(cacheCtx, key, e.cached().rawGetOrUnmarshal(true), e.keepBinary()));
+
+                                if (cacheCtx.cancelRemove(t))
+                                    continue;
+                            }
+
+                            if (writeStore == null)
+                                writeStore = cacheCtx.store();
+
+                            if (writeStore.isWriteThrough()) {
+                                if (rmvCol == null)
+                                    rmvCol = new ArrayList<>();
+
+                                rmvCol.add(key);
+                            }
+                        }
+                        else if (log.isDebugEnabled())
+                            log.debug("Ignoring NOOP entry for batch store commit: " + e);
+                    }
+
+                    if (putMap != null && !putMap.isEmpty()) {
+                        assert rmvCol == null || rmvCol.isEmpty();
+                        assert writeStore != null;
+
+                        // Batch put at the end of transaction.
+                        writeStore.putAll(this, putMap);
+                    }
+
+                    if (rmvCol != null && !rmvCol.isEmpty()) {
+                        assert putMap == null || putMap.isEmpty();
+                        assert writeStore != null;
+
+                        // Batch remove at the end of transaction.
+                        writeStore.removeAll(this, rmvCol);
+                    }
+                }
+
+                // Commit while locks are held.
+                sessionEnd(stores, true);
+            }
+            catch (IgniteCheckedException ex) {
+                commitError(ex);
+
+                setRollbackOnly();
+
+                // Safe to remove transaction from committed tx list because nothing was committed yet.
+                cctx.tm().removeCommittedTx(this);
+
+                throw ex;
+            }
+            catch (Throwable ex) {
+                commitError(ex);
+
+                setRollbackOnly();
+
+                // Safe to remove transaction from committed tx list because nothing was committed yet.
+                cctx.tm().removeCommittedTx(this);
+
+                if (ex instanceof Error)
+                    throw (Error)ex;
+
+                throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex);
+            }
+            finally {
+                if (isRollbackOnly())
+                    sessionEnd(stores, false);
+            }
+        }
+    }
+
+    /**
      * @param txEntry Entry to process.
      * @param metrics {@code True} if metrics should be updated.
      * @return Tuple containing transformation results.
@@ -1788,6 +2035,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
+        @Override public void commitError(Throwable e) {
+            throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
+        }
+
+        /** {@inheritDoc} */
         @Override public boolean empty() {
             throw new IllegalStateException("Deserialized transaction can only be used as read-only.");
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
index 3e0231e..66069a7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitSingleStateImpl.java
@@ -163,13 +163,8 @@ public class IgniteTxImplicitSingleStateImpl extends IgniteTxLocalStateAdapter {
 
         CacheStoreManager store = cacheCtx.store();
 
-        if (store.configured()) {
-            HashSet<CacheStoreManager> set = new HashSet<>(3, 0.75f);
-
-            set.add(store);
-
-            return set;
-        }
+        if (store.configured())
+            return Collections.singleton(store);
 
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index 962f2d5..312d3a8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -25,7 +25,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -42,7 +41,6 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheInvokeEntry;
-import org.apache.ignite.internal.processors.cache.CacheLazyEntry;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.CacheOperationContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -629,209 +627,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
         return cacheCtx.cache().entryEx(key.key(), topVer);
     }
 
-    /**
-     * Performs batch database operations. This commit must be called
-     * before {@link #userCommit()}. This way if there is a DB failure,
-     * cache transaction can still be rolled back.
-     *
-     * @param writeEntries Transaction write set.
-     * @throws IgniteCheckedException If batch update failed.
-     */
-    @SuppressWarnings({"CatchGenericClass"})
-    protected void batchStoreCommit(Iterable<IgniteTxEntry> writeEntries) throws IgniteCheckedException {
-        if (!storeEnabled() || internal())
-            return;
-
-        Collection<CacheStoreManager> stores = txState.stores(cctx);
-
-        if (stores == null || stores.isEmpty())
-            return;
-
-        assert isWriteToStoreFromDhtValid(stores) : "isWriteToStoreFromDht can't be different within one transaction";
-
-        boolean isWriteToStoreFromDht = F.first(stores).isWriteToStoreFromDht();
-
-        if (near() || isWriteToStoreFromDht) {
-            try {
-                if (writeEntries != null) {
-                    Map<Object, IgniteBiTuple<Object, GridCacheVersion>> putMap = null;
-                    List<Object> rmvCol = null;
-                    CacheStoreManager writeStore = null;
-
-                    boolean skipNonPrimary = near() && isWriteToStoreFromDht;
-
-                    for (IgniteTxEntry e : writeEntries) {
-                        boolean skip = e.skipStore();
-
-                        if (!skip && skipNonPrimary) {
-                            skip = e.cached().isNear() ||
-                                e.cached().detached() ||
-                                !e.context().affinity().primary(e.cached().partition(), topologyVersion()).isLocal();
-                        }
-
-                        if (skip)
-                            continue;
-
-                        boolean intercept = e.context().config().getInterceptor() != null;
-
-                        if (intercept || !F.isEmpty(e.entryProcessors()))
-                            e.cached().unswap(false);
-
-                        IgniteBiTuple<GridCacheOperation, CacheObject> res = applyTransformClosures(e, false);
-
-                        GridCacheContext cacheCtx = e.context();
-
-                        GridCacheOperation op = res.get1();
-                        KeyCacheObject key = e.key();
-                        CacheObject val = res.get2();
-                        GridCacheVersion ver = writeVersion();
-
-                        if (op == CREATE || op == UPDATE) {
-                            // Batch-process all removes if needed.
-                            if (rmvCol != null && !rmvCol.isEmpty()) {
-                                assert writeStore != null;
-
-                                writeStore.removeAll(this, rmvCol);
-
-                                // Reset.
-                                rmvCol.clear();
-
-                                writeStore = null;
-                            }
-
-                            // Batch-process puts if cache ID has changed.
-                            if (writeStore != null && writeStore != cacheCtx.store()) {
-                                if (putMap != null && !putMap.isEmpty()) {
-                                    writeStore.putAll(this, putMap);
-
-                                    // Reset.
-                                    putMap.clear();
-                                }
-
-                                writeStore = null;
-                            }
-
-                            if (intercept) {
-                                Object interceptorVal = cacheCtx.config().getInterceptor().onBeforePut(
-                                    new CacheLazyEntry(
-                                        cacheCtx,
-                                        key,
-                                        e.cached().rawGetOrUnmarshal(true),
-                                        e.keepBinary()),
-                                    cacheCtx.cacheObjectContext().unwrapBinaryIfNeeded(val, e.keepBinary(), false));
-
-                                if (interceptorVal == null)
-                                    continue;
-
-                                val = cacheCtx.toCacheObject(cacheCtx.unwrapTemporary(interceptorVal));
-                            }
-
-                            if (writeStore == null)
-                                writeStore = cacheCtx.store();
-
-                            if (writeStore.isWriteThrough()) {
-                                if (putMap == null)
-                                    putMap = new LinkedHashMap<>(writeMap().size(), 1.0f);
-
-                                putMap.put(key, F.<Object, GridCacheVersion>t(val, ver));
-                            }
-                        }
-                        else if (op == DELETE) {
-                            // Batch-process all puts if needed.
-                            if (putMap != null && !putMap.isEmpty()) {
-                                assert writeStore != null;
-
-                                writeStore.putAll(this, putMap);
-
-                                // Reset.
-                                putMap.clear();
-
-                                writeStore = null;
-                            }
-
-                            if (writeStore != null && writeStore != cacheCtx.store()) {
-                                if (rmvCol != null && !rmvCol.isEmpty()) {
-                                    writeStore.removeAll(this, rmvCol);
-
-                                    // Reset.
-                                    rmvCol.clear();
-                                }
-
-                                writeStore = null;
-                            }
-
-                            if (intercept) {
-                                IgniteBiTuple<Boolean, Object> t = cacheCtx.config().getInterceptor().onBeforeRemove(
-                                    new CacheLazyEntry(cacheCtx, key, e.cached().rawGetOrUnmarshal(true), e.keepBinary()));
-
-                                if (cacheCtx.cancelRemove(t))
-                                    continue;
-                            }
-
-                            if (writeStore == null)
-                                writeStore = cacheCtx.store();
-
-                            if (writeStore.isWriteThrough()) {
-                                if (rmvCol == null)
-                                    rmvCol = new ArrayList<>();
-
-                                rmvCol.add(key);
-                            }
-                        }
-                        else if (log.isDebugEnabled())
-                            log.debug("Ignoring NOOP entry for batch store commit: " + e);
-                    }
-
-                    if (putMap != null && !putMap.isEmpty()) {
-                        assert rmvCol == null || rmvCol.isEmpty();
-                        assert writeStore != null;
-
-                        // Batch put at the end of transaction.
-                        writeStore.putAll(this, putMap);
-                    }
-
-                    if (rmvCol != null && !rmvCol.isEmpty()) {
-                        assert putMap == null || putMap.isEmpty();
-                        assert writeStore != null;
-
-                        // Batch remove at the end of transaction.
-                        writeStore.removeAll(this, rmvCol);
-                    }
-                }
-
-                // Commit while locks are held.
-                sessionEnd(stores, true);
-            }
-            catch (IgniteCheckedException ex) {
-                commitError(ex);
-
-                setRollbackOnly();
-
-                // Safe to remove transaction from committed tx list because nothing was committed yet.
-                cctx.tm().removeCommittedTx(this);
-
-                throw ex;
-            }
-            catch (Throwable ex) {
-                commitError(ex);
-
-                setRollbackOnly();
-
-                // Safe to remove transaction from committed tx list because nothing was committed yet.
-                cctx.tm().removeCommittedTx(this);
-
-                if (ex instanceof Error)
-                    throw (Error)ex;
-
-                throw new IgniteCheckedException("Failed to commit transaction to database: " + this, ex);
-            }
-            finally {
-                if (isRollbackOnly())
-                    sessionEnd(stores, false);
-            }
-        }
-    }
-
     /** {@inheritDoc} */
     @SuppressWarnings({"CatchGenericClass"})
     @Override public void userCommit() throws IgniteCheckedException {
@@ -1314,21 +1109,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
-     * @param stores Store managers.
-     * @param commit Commit flag.
-     * @throws IgniteCheckedException In case of error.
-     */
-    private void sessionEnd(Collection<CacheStoreManager> stores, boolean commit) throws IgniteCheckedException {
-        Iterator<CacheStoreManager> it = stores.iterator();
-
-        while (it.hasNext()) {
-            CacheStoreManager store = it.next();
-
-            store.sessionEnd(this, commit, !it.hasNext());
-        }
-    }
-
-    /**
      * @param entry Entry.
      * @return {@code True} if local node is current primary for given entry.
      */
@@ -3989,23 +3769,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter
     }
 
     /**
-     * @param stores Store managers.
-     * @return If {@code isWriteToStoreFromDht} value same for all stores.
-     */
-    private boolean isWriteToStoreFromDhtValid(Collection<CacheStoreManager> stores) {
-        if (stores != null && !stores.isEmpty()) {
-            boolean exp = F.first(stores).isWriteToStoreFromDht();
-
-            for (CacheStoreManager store : stores) {
-                if (store.isWriteToStoreFromDht() != exp)
-                    return false;
-            }
-        }
-
-        return true;
-    }
-
-    /**
      * Post-lock closure alias.
      *
      * @param <T> Return type.

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
index 78f517c..d4350d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java
@@ -47,11 +47,6 @@ public interface IgniteTxLocalEx extends IgniteInternalTx {
     @Nullable public Throwable commitError();
 
     /**
-     * @param e Commit error.
-     */
-    public void commitError(Throwable e);
-
-    /**
      * @throws IgniteCheckedException If commit failed.
      */
     public void userCommit() throws IgniteCheckedException;

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
index a68006b..eb39edd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteSingleStateImpl.java
@@ -20,9 +20,10 @@ package org.apache.ignite.internal.processors.cache.transactions;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
@@ -114,4 +115,19 @@ public class IgniteTxRemoteSingleStateImpl extends IgniteTxRemoteStateAdapter {
     public String toString() {
         return S.toString(IgniteTxRemoteSingleStateImpl.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
+        if (entry == null)
+            return null;
+
+        CacheStoreManager store = entry.context().store();
+
+        if (store.configured()
+            && store.isLocal()) { // Only local stores take part at tx on backup node.
+            return Collections.singleton(store);
+        }
+
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
index 3e5034b..eaeaccc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateAdapter.java
@@ -17,12 +17,10 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
-import java.util.Collection;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
-import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.jetbrains.annotations.Nullable;
 
@@ -97,13 +95,6 @@ public abstract class IgniteTxRemoteStateAdapter implements IgniteTxRemoteState
     }
 
     /** {@inheritDoc} */
-    @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
-        assert false;
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
     @Nullable @Override public GridCacheContext singleCacheContext(GridCacheSharedContext cctx) {
         return null;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
index 3335b44..4f46831 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteStateImpl.java
@@ -17,13 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -145,4 +147,34 @@ public class IgniteTxRemoteStateImpl extends IgniteTxRemoteStateAdapter {
     public String toString() {
         return S.toString(IgniteTxRemoteStateImpl.class, this);
     }
+
+    /** {@inheritDoc} */
+    @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) {
+        int locStoreCnt = cctx.getLocalStoreCount();
+
+        if (locStoreCnt > 0 && !writeMap.isEmpty()) {
+            Collection<CacheStoreManager> stores = null;
+
+            for (IgniteTxEntry e : writeMap.values()) {
+                if (e.skipStore())
+                    continue;
+
+                CacheStoreManager store = e.context().store();
+
+                if (store.configured() && store.isLocal()) {
+                    if (stores == null)
+                        stores = new ArrayList<>(locStoreCnt);
+
+                    stores.add(store);
+
+                    if (stores.size() == locStoreCnt)
+                        break;
+                }
+            }
+
+            return stores;
+        }
+
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index 1256aa6..f08377b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -47,8 +47,8 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter {
     /** Active cache IDs. */
     private GridLongList activeCacheIds = new GridLongList();
-    /** Per-transaction read map. */
 
+    /** Per-transaction read map. */
     @GridToStringInclude
     protected Map<IgniteTxKey, IgniteTxEntry> txMap;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/9f2bafbc/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
index 9c4b7b2..eaae2f9 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheStoreUsageMultinodeAbstractTest.java
@@ -242,9 +242,10 @@ public abstract class CacheStoreUsageMultinodeAbstractTest extends GridCommonAbs
 
         assertTrue("Store is not updated", wait);
 
-        assertEquals("Write on wrong node: " + writeMap, 1, writeMap.size());
+        assertEquals("Write on wrong node: " + writeMap, locStore ? 2 : 1, writeMap.size());
 
-        assertEquals(expNode, writeMap.keySet().iterator().next());
+        if (!locStore)
+            assertEquals(expNode, writeMap.keySet().iterator().next());
 
         writeMap.clear();
     }


Mime
View raw message