ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject ignite git commit: ignite-1607 WIP
Date Fri, 23 Oct 2015 07:30:48 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1607 9f77d2723 -> dce9bd592


ignite-1607 WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dce9bd59
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dce9bd59
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dce9bd59

Branch: refs/heads/ignite-1607
Commit: dce9bd592f774f01cf7d2bc2b2c4253824c7ab2e
Parents: 9f77d27
Author: sboikov <sboikov@gridgain.com>
Authored: Fri Oct 23 10:30:44 2015 +0300
Committer: sboikov <sboikov@gridgain.com>
Committed: Fri Oct 23 10:30:44 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheMapEntry.java     |  38 ++--
 .../dht/atomic/GridDhtAtomicCache.java          |   1 +
 .../cache/version/GridCacheVersionManager.java  |  10 +
 .../datastreamer/DataStreamerImpl.java          |   9 +-
 .../DateStreamerUpdateAfterLoadTest.java        | 184 +++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite.java |   4 +-
 6 files changed, 222 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 621ed99..2111594 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -3216,34 +3216,36 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter
impleme
         checkObsolete();
 
         if (curVer == null || curVer.equals(ver)) {
-            GridCacheMvcc mvcc = mvccExtras();
+            if (val != this.val) {
+                GridCacheMvcc mvcc = mvccExtras();
 
-            if (mvcc != null && !mvcc.isEmpty())
-                return null;
+                if (mvcc != null && !mvcc.isEmpty())
+                    return null;
 
-            if (newVer == null)
-                newVer = cctx.versions().next();
+                if (newVer == null)
+                    newVer = cctx.versions().next();
 
-            CacheObject old = rawGetOrUnmarshalUnlocked(false);
+                CacheObject old = rawGetOrUnmarshalUnlocked(false);
 
-            long ttl = ttlExtras();
+                long ttl = ttlExtras();
 
-            long expTime = CU.toExpireTime(ttl);
+                long expTime = CU.toExpireTime(ttl);
 
-            // Detach value before index update.
-            val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
+                // Detach value before index update.
+                val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx);
 
-            if (val != null) {
-                updateIndex(val, expTime, newVer, old);
+                if (val != null) {
+                    updateIndex(val, expTime, newVer, old);
 
-                if (deletedUnlocked())
-                    deletedUnlocked(false);
-            }
+                    if (deletedUnlocked())
+                        deletedUnlocked(false);
+                }
 
-            // Version does not change for load ops.
-            update(val, expTime, ttl, newVer);
+                // Version does not change for load ops.
+                update(val, expTime, ttl, newVer);
 
-            return newVer;
+                return newVer;
+            }
         }
 
         return null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9f5ad3e..3c4a5b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1657,6 +1657,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
 
                     if (idx != null) {
                         GridDhtCacheEntry entry = entries.get(idx);
+
                         try {
                             GridCacheVersion ver = entry.version();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
index 7a4be0a..87fe515 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/version/GridCacheVersionManager.java
@@ -149,6 +149,16 @@ public class GridCacheVersionManager extends GridCacheSharedManagerAdapter
{
     }
 
     /**
+     * Version for entries loaded with isolated streamer, should be less than any version
generated
+     * for entries update.
+     *
+     * @return Version for entries loaded with isolated streamer.
+     */
+    public GridCacheVersion nextForIsolatedStreamer() {
+        return next(0, true, false);
+    }
+
+    /**
      * @return Next version based on current topology.
      */
     public GridCacheVersion next() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index bf9dc78..9f07541 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -302,6 +302,8 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
         fut = new DataStreamerFuture(this);
 
         publicFut = new IgniteCacheFutureImpl<>(fut);
+
+        ver = ctx.cache().context().versions().nextForIsolatedStreamer();
     }
 
     /**
@@ -1252,7 +1254,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
                     false,
                     skipStore,
                     rcvr,
-                    ver);
+                    rcvr == ISOLATED_UPDATER ? ver : null);
 
                 fut = ctx.closure().callLocalSafe(job, false);
 
@@ -1288,9 +1290,6 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
                         assert rcvr != null;
 
                         updaterBytes = ctx.config().getMarshaller().marshal(rcvr);
-
-                        if (rcvr == ISOLATED_UPDATER)
-                            ver = ctx.cache().context().versions().next();
                     }
 
                     if (topicBytes == null)
@@ -1352,7 +1351,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K,
V>, Delayed
                     dep != null ? dep.classLoaderId() : null,
                     dep == null,
                     topVer,
-                    ver);
+                    rcvr == ISOLATED_UPDATER ? ver : null);
 
                 try {
                     ctx.io().send(node, TOPIC_DATASTREAM, req, PUBLIC_POOL);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java
new file mode 100644
index 0000000..fc3e9e0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/datastreamer/DateStreamerUpdateAfterLoadTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.datastreamer;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
+import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class DateStreamerUpdateAfterLoadTest extends GridCommonAbstractTest {
+    /** */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private boolean client;
+
+    /** */
+    private static final int NODES = 4;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGridsMultiThreaded(NODES - 1);
+
+        client = true;
+
+        startGrid(NODES - 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testUpdateAfterLoad() throws Exception {
+        Ignite ignite0 = ignite(0);
+
+        for (CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) {
+            int key = 0;
+
+            try (IgniteCache<Integer, Integer> cache = ignite0.createCache(ccfg)) {
+                key = testLoadAndUpdate(cache.getName(), key, false);
+
+                testLoadAndUpdate(cache.getName(), key, true);
+
+                ignite0.destroyCache(cache.getName());
+            }
+        }
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @param allowOverwrite Streamer flag.
+     * @return Next key.
+     * @throws Exception If failed.
+     */
+    private int testLoadAndUpdate(String cacheName, int key, boolean allowOverwrite) throws
Exception {
+        for (int loadNode = 0; loadNode < NODES; loadNode++) {
+            Ignite loadIgnite = ignite(loadNode);
+
+            for (int updateNode = 0; updateNode < NODES; updateNode++) {
+                try (IgniteDataStreamer<Integer, Integer> streamer = loadIgnite.dataStreamer(cacheName))
{
+                    streamer.allowOverwrite(allowOverwrite);
+
+                    streamer.addData(key, key);
+                }
+
+                Ignite updateIgnite = ignite(updateNode);
+
+                IgniteCache<Integer, Integer> cache = updateIgnite.cache(cacheName);
+
+                if (allowOverwrite)
+                    atomicClockModeDelay(cache);
+
+                updateIgnite.cache(cacheName).put(key, key + 1);
+
+                checkValue(key, key + 1, cacheName);
+
+                key++;
+            }
+        }
+
+        return key;
+    }
+
+    /**
+     * @param key Key.
+     * @param val Value.
+     * @param cacheName Cache name.
+     */
+    private void checkValue(Integer key, Integer val, String cacheName) {
+        for (int i = 0; i < NODES; i++) {
+            IgniteCache<Integer, Integer> cache = ignite(i).cache(cacheName);
+
+            assertEquals("Unexpected value " + i, val, cache.get(key));
+        }
+    }
+
+    /**
+     * @return Cache configurations to test.
+     */
+    private List<CacheConfiguration<Integer, Integer>> cacheConfigurations()
{
+        List<CacheConfiguration<Integer, Integer>> ccfgs = new ArrayList<>();
+
+        ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 1, "cache-" + ccfgs.size()));
+        ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, PRIMARY, 0, "cache-" + ccfgs.size()));
+        ccfgs.add(cacheConfiguration(CacheAtomicityMode.ATOMIC, CLOCK, 1, "cache-" + ccfgs.size()));
+        ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 1, "cache-"
+ ccfgs.size()));
+        ccfgs.add(cacheConfiguration(CacheAtomicityMode.TRANSACTIONAL, null, 0, "cache-"
+ ccfgs.size()));
+
+        return ccfgs;
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @param writeOrderMode Cache write order mode.
+     * @param backups Number of backups.
+     * @param name Cache name.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration<Integer, Integer> cacheConfiguration(CacheAtomicityMode
atomicityMode,
+        CacheAtomicWriteOrderMode writeOrderMode,
+        int backups,
+        String name) {
+        CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+        ccfg.setName(name);
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setBackups(backups);
+        ccfg.setAtomicWriteOrderMode(writeOrderMode);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+
+        return ccfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dce9bd59/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 1deb3bc..8f719cf 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -122,6 +122,7 @@ import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessorSel
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerImplSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultiThreadedSelfTest;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerMultinodeCreateCacheTest;
+import org.apache.ignite.internal.processors.datastreamer.DateStreamerUpdateAfterLoadTest;
 import org.apache.ignite.testframework.GridTestUtils;
 
 /**
@@ -213,7 +214,8 @@ public class IgniteCacheTestSuite extends TestSuite {
         suite.addTestSuite(GridCacheAffinityApiSelfTest.class);
         suite.addTestSuite(GridCacheStoreValueBytesSelfTest.class);
         GridTestUtils.addTestIfNeeded(suite, DataStreamProcessorSelfTest.class, ignoredTests);
-        suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class);
+        GridTestUtils.addTestIfNeeded(suite, DateStreamerUpdateAfterLoadTest.class, ignoredTests);
+            suite.addTestSuite(DataStreamerMultiThreadedSelfTest.class);
         suite.addTestSuite(DataStreamerMultinodeCreateCacheTest.class);
         suite.addTestSuite(DataStreamerImplSelfTest.class);
         GridTestUtils.addTestIfNeeded(suite, GridCacheEntryMemorySizeSelfTest.class, ignoredTests);


Mime
View raw message