ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ag...@apache.org
Subject ignite git commit: ignite-6214 resolve problem with concurrent metadata updates
Date Mon, 04 Sep 2017 16:18:24 GMT
Repository: ignite
Updated Branches:
  refs/heads/master a2a902684 -> 740a9b5bd


ignite-6214 resolve problem with concurrent metadata updates


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/740a9b5b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/740a9b5b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/740a9b5b

Branch: refs/heads/master
Commit: 740a9b5bd7344ae099e6359f175759bb1a75835a
Parents: a2a9026
Author: Denis Mekhanikov <dmekhanikov@gmail.com>
Authored: Thu Aug 31 10:32:25 2017 +0300
Committer: Andrey Gura <agura@apache.org>
Committed: Mon Sep 4 19:08:41 2017 +0300

----------------------------------------------------------------------
 .../cache/binary/BinaryMetadataTransport.java   | 14 +++++-
 .../binary/BinaryMetadataUpdatesFlowTest.java   | 48 ++++++++++++++++++++
 2 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/740a9b5b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index 77190a4..010ab0f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -30,6 +30,7 @@ import org.apache.ignite.events.DiscoveryEvent;
 import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryMetadata;
 import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -392,7 +394,7 @@ final class BinaryMetadataTransport {
      * @param pendingVer Pending version.
      * @param fut Future.
      */
-    private void initSyncFor(int typeId, int pendingVer, MetadataUpdateResultFuture fut)
{
+    private void initSyncFor(int typeId, int pendingVer, final MetadataUpdateResultFuture
fut) {
         if (stopping) {
             fut.onDone(MetadataUpdateResult.createUpdateDisabledResult());
 
@@ -401,7 +403,15 @@ final class BinaryMetadataTransport {
 
         SyncKey key = new SyncKey(typeId, pendingVer);
 
-        syncMap.put(key, fut);
+        MetadataUpdateResultFuture oldFut = syncMap.putIfAbsent(key, fut);
+
+        if (oldFut != null) {
+            oldFut.listen(new IgniteInClosure<IgniteInternalFuture<MetadataUpdateResult>>()
{
+                @Override public void apply(IgniteInternalFuture<MetadataUpdateResult>
doneFut) {
+                    fut.onDone(doneFut.result(), doneFut.error());
+                }
+            });
+        }
 
         fut.key(key);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/740a9b5b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
index 9ec48cf..3ee51c8 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.ignite.internal.processors.cache.binary;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingDeque;
@@ -25,6 +27,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.cache.event.CacheEntryListenerException;
 import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.binary.BinaryObjectBuilder;
@@ -36,6 +39,7 @@ import org.apache.ignite.cluster.ClusterGroup;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryMarshaller;
 import org.apache.ignite.internal.binary.BinaryObjectImpl;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
@@ -52,6 +56,8 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.eclipse.jetty.util.ConcurrentHashSet;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
 /**
  *
  */
@@ -343,6 +349,48 @@ public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest
{
     }
 
     /**
+     * @throws Exception If failed.
+     */
+    public void testConcurrentMetadataUpdates() throws Exception {
+        startGrid(0);
+
+        final Ignite client = startGrid(getConfiguration("client").setClientMode(true));
+
+        final IgniteCache<Integer, Object> cache = client.cache(DEFAULT_CACHE_NAME).withKeepBinary();
+
+        int threadsNum = 10;
+        final int updatesNum = 2000;
+
+        List<IgniteInternalFuture> futs = new ArrayList<>();
+
+        for (int i = 0; i < threadsNum; i++) {
+            final int threadId = i;
+
+            IgniteInternalFuture fut = runAsync(new Runnable() {
+                @Override public void run() {
+                    try {
+                        for (int j = 0; j < updatesNum; j++) {
+                            BinaryObjectBuilder bob = client.binary().builder(BINARY_TYPE_NAME);
+
+                            bob.setField("field" + j, threadId);
+
+                            cache.put(threadId, bob.build());
+                        }
+                    }
+                    catch (Exception e) {
+                        e.printStackTrace();
+                    }
+                }
+            }, "updater-" + i);
+
+            futs.add(fut);
+        }
+
+        for (IgniteInternalFuture fut : futs)
+            fut.get();
+    }
+
+    /**
      * Runnable responsible for stopping (gracefully) server nodes during metadata updates
process.
      */
     private final class ServerNodeKiller implements Runnable {


Mime
View raw message