ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-6149
Date Mon, 18 Sep 2017 14:01:20 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-3478 6d747761e -> 30421e399


ignite-6149


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/30421e39
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/30421e39
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/30421e39

Branch: refs/heads/ignite-3478
Commit: 30421e3993fb8a6b94c7b9fa5daf6909b449d52d
Parents: 6d74776
Author: sboikov <sboikov@gridgain.com>
Authored: Mon Sep 18 17:01:09 2017 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Mon Sep 18 17:01:09 2017 +0300

----------------------------------------------------------------------
 .../communication/GridIoMessageFactory.java     |   6 +
 .../processors/cache/GridCacheAdapter.java      |   2 +
 .../processors/cache/GridCacheEntryEx.java      |   2 +
 .../processors/cache/GridCacheEntryInfo.java    |  38 +++++-
 .../processors/cache/GridCacheMapEntry.java     |  10 +-
 .../cache/GridCacheMvccEntryInfo.java           | 133 +++++++++++++++++++
 .../distributed/dht/GridDhtCacheAdapter.java    |   2 +
 .../distributed/dht/GridDhtLockFuture.java      |   6 +-
 .../distributed/dht/GridDhtTxPrepareFuture.java |   1 +
 .../dht/preloader/GridDhtForceKeysFuture.java   |   1 +
 .../dht/preloader/GridDhtPartitionDemander.java |   1 +
 .../dht/preloader/GridDhtPartitionSupplier.java |   6 +-
 .../datastreamer/DataStreamerImpl.java          |   2 +
 .../processors/cache/GridCacheTestEntryEx.java  |   1 +
 .../cache/mvcc/CacheMvccTransactionsTest.java   |  74 ++++++++++-
 15 files changed, 275 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index cf3bd2a..9bd04fa 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -43,6 +43,7 @@ import org.apache.ignite.internal.processors.cache.CacheInvokeDirectResult;
 import org.apache.ignite.internal.processors.cache.CacheObjectByteArrayImpl;
 import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridChangeGlobalStateMessageResponse;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
@@ -917,6 +918,11 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 return msg;
 
+            case 138:
+                msg = new GridCacheMvccEntryInfo();
+
+                return msg;
+
 
             // [-3..119] [124..128] [-23..-27] [-36..-55]- this
             // [120..123] - DR

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 838903a..4b68564 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3482,8 +3482,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K,
V
         GridCacheEntryEx entry = entryEx(key);
 
         try {
+            // TODO IGNITE-3478 (mvcc ver)
             entry.initialValue(cacheVal,
                 ver,
+                null,
                 ttl,
                 CU.EXPIRE_TIME_CALCULATE,
                 false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
index 18130de..8b9b77d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java
@@ -659,6 +659,7 @@ public interface GridCacheEntryEx {
      *
      * @param val New value.
      * @param ver Version to use.
+     * @param mvccVer Mvcc version.
      * @param ttl Time to live.
      * @param expireTime Expiration time.
      * @param preload Flag indicating whether entry is being preloaded.
@@ -671,6 +672,7 @@ public interface GridCacheEntryEx {
      */
     public boolean initialValue(CacheObject val,
         GridCacheVersion ver,
+        @Nullable MvccCoordinatorVersion mvccVer,
         long ttl,
         long expireTime,
         boolean preload,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
index 7371153..e09d33c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryInfo.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.processors.cache;
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
+import org.apache.ignite.internal.processors.cache.mvcc.MvccLongList;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
@@ -31,7 +33,7 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 /**
  * Entry information that gets passed over wire.
  */
-public class GridCacheEntryInfo implements Message {
+public class GridCacheEntryInfo implements Message, MvccCoordinatorVersion {
     /** */
     private static final int SIZE_OVERHEAD = 3 * 8 /* reference */ + 4 /* int */ + 2 * 8
/* long */ + 32 /* version */;
 
@@ -66,6 +68,40 @@ public class GridCacheEntryInfo implements Message {
     private boolean deleted;
 
     /**
+     * @param mvccCrdVer Mvcc coordinator version.
+     */
+    public void mvccCoordinatorVersion(long mvccCrdVer) {
+        // No-op.
+    }
+
+    /**
+     * @param mvccCntr Mvcc counter.
+     */
+    public void mvccCounter(long mvccCntr) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public MvccLongList activeTransactions() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long coordinatorVersion() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long cleanupVersion() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long counter() {
+        return 0;
+    }
+
+    /**
      * @return Cache ID.
      */
     public int cacheId() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 3a42a98..57c77fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras;
-import org.apache.ignite.internal.processors.cache.mvcc.CacheCoordinatorsSharedManager;
 import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinatorVersion;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
@@ -2553,6 +2552,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
     @Override public boolean initialValue(
         CacheObject val,
         GridCacheVersion ver,
+        MvccCoordinatorVersion mvccVer,
         long ttl,
         long expireTime,
         boolean preload,
@@ -2591,8 +2591,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
 
                 val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
 
-                if (val != null)
-                    storeValue(val, expTime, ver, null);
+                if (val != null) {
+                    if (cctx.mvccEnabled())
+                        cctx.offheap().mvccUpdate(this, val, ver, mvccVer);
+                    else
+                        storeValue(val, expTime, ver, null);
+                }
 
                 update(val, expTime, ttl, ver, true);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
new file mode 100644
index 0000000..c914f58
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccEntryInfo.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ *
+ */
+public class GridCacheMvccEntryInfo extends GridCacheEntryInfo {
+    /** */
+    private long mvccCrdVer;
+
+    /** */
+    private long mvccCntr;
+
+    /** {@inheritDoc} */
+    @Override public void mvccCoordinatorVersion(long mvccCrdVer) {
+        this.mvccCrdVer = mvccCrdVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void mvccCounter(long mvccCntr) {
+        this.mvccCntr = mvccCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long coordinatorVersion() {
+        return mvccCrdVer;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long counter() {
+        return mvccCntr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 8;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!super.writeTo(buf, writer))
+            return false;
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 6:
+                if (!writer.writeLong("mvccCntr", mvccCntr))
+                    return false;
+
+                writer.incrementState();
+
+            case 7:
+                if (!writer.writeLong("mvccCrdVer", mvccCrdVer))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        if (!super.readFrom(buf, reader))
+            return false;
+
+        switch (reader.state()) {
+            case 6:
+                mvccCntr = reader.readLong("mvccCntr");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 7:
+                mvccCrdVer = reader.readLong("mvccCrdVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(GridCacheMvccEntryInfo.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public short directType() {
+        return 138;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(GridCacheMvccEntryInfo.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 1e5b200..ac04e4b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -657,8 +657,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
 
                     entry = entryEx(key);
 
+                    // TODO IGNITE-3478 (mvcc ver)
                     entry.initialValue(cacheVal,
                         ver,
+                        null,
                         ttl,
                         CU.EXPIRE_TIME_CALCULATE,
                         false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
index e0a0260..5b15c29 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java
@@ -1077,8 +1077,10 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
                                     expireTime = CU.toExpireTime(ttl);
                                 }
 
+                                // TODO IGNITE-3478 (mvcc ver)
                                 entry0.initialValue(val0,
                                     ver,
+                                    null,
                                     ttl,
                                     expireTime,
                                     false,
@@ -1258,9 +1260,11 @@ public final class GridDhtLockFuture extends GridCacheCompoundIdentityFuture<Boo
                         try {
                             if (entry.initialValue(info.value(),
                                 info.version(),
+                                info,
                                 info.ttl(),
                                 info.expireTime(),
-                                true, topVer,
+                                true,
+                                topVer,
                                 replicate ? DR_PRELOAD : DR_NONE,
                                 false)) {
                                 if (rec && !entry.isInternal())

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
index d6b92a5..42c2914 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java
@@ -1878,6 +1878,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite
                         try {
                             if (entry.initialValue(info.value(),
                                 info.version(),
+                                info,
                                 info.ttl(),
                                 info.expireTime(),
                                 true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
index fe216a0..7660fa2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtForceKeysFuture.java
@@ -537,6 +537,7 @@ public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Objec
                         if (entry.initialValue(
                             info.value(),
                             info.version(),
+                            info,
                             info.ttl(),
                             info.expireTime(),
                             true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 54661ec..15d7047 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -803,6 +803,7 @@ public class GridDhtPartitionDemander {
                         if (cached.initialValue(
                             entry.value(),
                             entry.version(),
+                            entry,
                             entry.ttl(),
                             entry.expireTime(),
                             true,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
index e25ace7..0905917 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java
@@ -29,6 +29,7 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
 import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -371,13 +372,16 @@ class GridDhtPartitionSupplier {
 
                             CacheDataRow row = iter.next();
 
-                            GridCacheEntryInfo info = new GridCacheEntryInfo();
+                            GridCacheEntryInfo info = grp.mvccEnabled() ?
+                                new GridCacheMvccEntryInfo() : new GridCacheEntryInfo();
 
                             info.key(row.key());
                             info.expireTime(row.expireTime());
                             info.version(row.version());
                             info.value(row.value());
                             info.cacheId(row.cacheId());
+                            info.mvccCoordinatorVersion(row.mvccCoordinatorVersion());
+                            info.mvccCounter(row.mvccCounter());
 
                             if (preloadPred == null || preloadPred.apply(info))
                                 s.addEntry0(part, info, grp.shared(), grp.cacheObjectContext());

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 6ed552a..257a127 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -2067,8 +2067,10 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
 
                     boolean primary = cctx.affinity().primaryByKey(cctx.localNode(), entry.key(),
topVer);
 
+                    // TODO IGNITE-3478 (mvcc version).
                     entry.initialValue(e.getValue(),
                         ver,
+                        null,
                         ttl,
                         expiryTime,
                         false,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
index f5309e5..11a854a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java
@@ -643,6 +643,7 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements
Gr
     @Override public boolean initialValue(
         CacheObject val,
         GridCacheVersion ver,
+        MvccCoordinatorVersion mvccVer,
         long ttl,
         long expireTime,
         boolean preload,

http://git-wip-us.apache.org/repos/asf/ignite/blob/30421e39/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
index 99ce163..f724afb 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/mvcc/CacheMvccTransactionsTest.java
@@ -80,7 +80,10 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest {
     private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
 
     /** */
-    private static final long DEFAULT_TEST_TIME = 30_000;
+    private static final int DFLT_PARTITION_COUNT = RendezvousAffinityFunction.DFLT_PARTITION_COUNT;
+
+    /** */
+    private static final long DFLT_TEST_TIME = 30_000;
 
     /** */
     private static final int SRVS = 4;
@@ -109,7 +112,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
 
     /** {@inheritDoc} */
     @Override protected long getTestTimeout() {
-        return DEFAULT_TEST_TIME + 60_000;
+        return DFLT_TEST_TIME + 60_000;
     }
 
     /** {@inheritDoc} */
@@ -864,7 +867,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
             cacheParts,
             writers,
             readers,
-            DEFAULT_TEST_TIME,
+            DFLT_TEST_TIME,
             null,
             writer,
             reader);
@@ -1095,7 +1098,7 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
             cacheParts,
             writers,
             readers,
-            DEFAULT_TEST_TIME,
+            DFLT_TEST_TIME,
             init,
             writer,
             reader);
@@ -1286,6 +1289,69 @@ public class CacheMvccTransactionsTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testRebalance1() throws Exception {
+        Ignite srv0 = startGrid(0);
+
+        IgniteCache<Integer, Integer> cache =  (IgniteCache)srv0.createCache(
+            cacheConfiguration(PARTITIONED, FULL_SYNC, 0, DFLT_PARTITION_COUNT));
+
+        Map<Integer, Integer> map;
+        Map<Integer, Integer> resMap;
+
+        try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+            map = new HashMap<>();
+
+            for (int i = 0; i < DFLT_PARTITION_COUNT * 3; i++)
+                map.put(i, i);
+
+            cache.putAll(map);
+
+            tx.commit();
+        }
+
+        startGrid(1);
+
+        awaitPartitionMapExchange();
+
+        resMap = cache.getAll(map.keySet());
+
+        assertEquals(map.size(), resMap.size());
+
+        for (int i = 0; i < map.size(); i++)
+            assertEquals(i, (Object)resMap.get(i));
+
+        try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+            for (int i = 0; i < DFLT_PARTITION_COUNT * 3; i++)
+                map.put(i, i + 1);
+
+            cache.putAll(map);
+
+            tx.commit();
+        }
+        try (Transaction tx = srv0.transactions().txStart(PESSIMISTIC, REPEATABLE_READ))
{
+            for (int i = 0; i < DFLT_PARTITION_COUNT * 3; i++)
+                map.put(i, i + 2);
+
+            cache.putAll(map);
+
+            tx.commit();
+        }
+
+        startGrid(2);
+
+        awaitPartitionMapExchange();
+
+        resMap = cache.getAll(map.keySet());
+
+        assertEquals(map.size(), map.size());
+
+        for (int i = 0; i < map.size(); i++)
+            assertEquals(i + 2, (Object)resMap.get(i));
+    }
+
+    /**
      * @param N Number of object to update in single transaction.
      * @param srvs Number of server nodes.
      * @param clients Number of client nodes.


Mime
View raw message