ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From akuznet...@apache.org
Subject [01/28] ignite git commit: IGNITE-3618: Client can not load data after server restarts. This closes #941.
Date Mon, 22 Aug 2016 03:26:33 GMT
Repository: ignite
Updated Branches:
  refs/heads/master f0c3b343e -> 2d4360707


IGNITE-3618: Client can not load data after server restarts. This closes #941.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d0cbb45
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d0cbb45
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d0cbb45

Branch: refs/heads/master
Commit: 1d0cbb45cd61c5c8e6ec926d7e629eb94111b32f
Parents: ff3e00c
Author: vd-pyatkov <vpyatkov@gridgain.com>
Authored: Thu Aug 11 08:43:50 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Thu Aug 11 08:43:50 2016 +0300

----------------------------------------------------------------------
 .../ignite/internal/binary/BinaryContext.java   |   7 +
 .../binary/CacheObjectBinaryProcessorImpl.java  |  19 ++
 .../ClientReconnectAfterClusterRestartTest.java | 225 +++++++++++++++++++
 .../IgniteCacheWithIndexingTestSuite.java       |   5 +-
 4 files changed, 255 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0cbb45/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index d78c126..a603894 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -1252,6 +1252,13 @@ public class BinaryContext {
     }
 
     /**
+     * Unregister all binary schemas.
+     */
+    public void unregisterBinarySchemas() {
+        schemas = null;
+    }
+
+    /**
      * Returns instance of {@link OptimizedMarshaller}.
      *
      * @return Optimized marshaller.

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0cbb45/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 6d980a8..0337874 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -50,6 +50,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.cluster.ClusterTopologyException;
 import org.apache.ignite.configuration.BinaryConfiguration;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.events.Event;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteNodeAttributes;
 import org.apache.ignite.internal.binary.BinaryContext;
@@ -67,6 +68,7 @@ import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter;
@@ -103,6 +105,7 @@ import org.jsr166.ConcurrentHashMap8;
 
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
+import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
 
 /**
  * Binary processor implementation.
@@ -146,6 +149,13 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     @GridToStringExclude
     private IgniteBinary binaries;
 
+    /** Listener removes all registred binary schemas after the local client reconnected.
*/
+    private final GridLocalEventListener clientDisconLsnr = new GridLocalEventListener()
{
+        @Override public void onEvent(Event evt) {
+            binaryContext().unregisterBinarySchemas();
+        }
+    };
+
     /** Metadata updates collected before metadata cache is initialized. */
     private final Map<Integer, BinaryMetadata> metaBuf = new ConcurrentHashMap<>();
 
@@ -165,6 +175,9 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     /** {@inheritDoc} */
     @Override public void start() throws IgniteCheckedException {
         if (marsh instanceof BinaryMarshaller) {
+            if (ctx.clientNode())
+                ctx.event().addLocalEventListener(clientDisconLsnr, EVT_CLIENT_NODE_DISCONNECTED);
+
             BinaryMetadataHandler metaHnd = new BinaryMetadataHandler() {
                 @Override public void addMeta(int typeId, BinaryType newMeta) throws BinaryObjectException
{
                     assert newMeta != null;
@@ -252,6 +265,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     }
 
     /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        if (ctx.clientNode())
+            ctx.event().removeLocalEventListener(clientDisconLsnr);
+    }
+
+    /** {@inheritDoc} */
     @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException
{
         if (clientNode && !ctx.isDaemon()) {
             ctx.continuous().registerStaticRoutine(

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0cbb45/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
new file mode 100644
index 0000000..b31447c
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/ClientReconnectAfterClusterRestartTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMemoryMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.QueryIndex;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.events.EventType;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+/**
+ */
+public class ClientReconnectAfterClusterRestartTest extends GridCommonAbstractTest {
+    /** Client id. */
+    public static final int CLIENT_ID = 1;
+
+    /** Cache params. */
+    public static final String CACHE_PARAMS = "PPRB_PARAMS";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception
{
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.setMarshaller(new BinaryMarshaller());
+        cfg.setIncludeEventTypes(EventType.EVTS_CACHE);
+
+        if (getTestGridName(CLIENT_ID).equals(gridName))
+            cfg.setClientMode(true);
+        else {
+            CacheConfiguration ccfg = getCacheConfiguration();
+
+            cfg.setCacheConfiguration(ccfg);
+        }
+
+        return cfg;
+    }
+
+    /**
+     * @return CacheConfiguration Cache configuration.
+     */
+    @NotNull private CacheConfiguration getCacheConfiguration() {
+        CacheConfiguration ccfg = defaultCacheConfiguration();
+
+        ccfg.setName(CACHE_PARAMS);
+        ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+        ccfg.setCacheMode(CacheMode.PARTITIONED);
+        ccfg.setMemoryMode(CacheMemoryMode.OFFHEAP_TIERED);
+
+        List<QueryEntity> queryEntities = new ArrayList<>();
+
+        QueryEntity entity = new QueryEntity();
+
+        entity.setValueType("Params");
+        entity.setKeyType("java.lang.Long");
+
+        LinkedHashMap<String, String> fields = new LinkedHashMap<>();
+
+        fields.put("ID", "java.lang.Long" );
+        fields.put("PARTITIONID", "java.lang.Long");
+        fields.put("CLIENTID", "java.lang.Long");
+        fields.put("PARAMETRCODE", "java.lang.Long");
+        fields.put("PARAMETRVALUE", "java.lang.Object");
+        fields.put("PARENTID", "java.lang.Long");
+
+        entity.setFields(fields);
+
+        List<QueryIndex> indexes = new ArrayList<>();
+
+        indexes.add(new QueryIndex("CLIENTID"));
+        indexes.add(new QueryIndex("ID"));
+        indexes.add(new QueryIndex("PARENTID"));
+
+        entity.setIndexes(indexes);
+
+        queryEntities.add(entity);
+
+        ccfg.setQueryEntities(queryEntities);
+        return ccfg;
+    }
+
+    /** */
+    public void testReconnectClient() throws Exception {
+        try {
+            Ignite igniteSrv = startGrid(0);
+
+            Ignite client = startGrid(1);
+
+            checkTopology(2);
+
+            client.events().localListen(new IgnitePredicate<Event>() {
+
+                @Override public boolean apply(Event event) {
+                    switch (event.type()) {
+                        case EventType.EVT_CLIENT_NODE_DISCONNECTED:
+                            info("Client disconnected");
+
+                            break;
+                        case EventType.EVT_CLIENT_NODE_RECONNECTED:
+                            info("Client reconnected");
+                    }
+
+                    return true;
+                }
+            }, EventType.EVT_CLIENT_NODE_DISCONNECTED, EventType.EVT_CLIENT_NODE_RECONNECTED);
+
+            IgniteDataStreamer<Long, BinaryObject> streamer = client.dataStreamer(CACHE_PARAMS);
+
+            streamer.allowOverwrite(true);
+            streamer.keepBinary(true);
+            streamer.perNodeBufferSize(10000);
+            streamer.perNodeParallelOperations(100);
+
+            BinaryObjectBuilder builder = client.binary().builder("PARAMS");
+
+            builder.setField("ID", 1L);
+            builder.setField("PARTITIONID", 1L);
+            builder.setField("CLIENTID", 1L);
+            builder.setField("PARAMETRCODE", 1L);
+            builder.setField("PARAMETRVALUE", "Test value");
+            builder.setField("PARENTID", 1L);
+            BinaryObject obj = builder.build();
+
+            streamer.addData(1L, obj);
+            streamer.flush();
+
+            stopAllServers(false);
+
+            Thread.sleep(2_000);
+
+            igniteSrv = startGrid(0);
+
+            Thread.sleep(2_000);
+
+            checkTopology(2);
+
+            info("Pre-insert");
+
+            streamer = client.dataStreamer("PPRB_PARAMS");
+            streamer.allowOverwrite(true);
+            streamer.keepBinary(true);
+            streamer.perNodeBufferSize(10000);
+            streamer.perNodeParallelOperations(100);
+
+            IgniteCache<Long, BinaryObject> cache = client.getOrCreateCache(CACHE_PARAMS).withKeepBinary();
+
+            builder = client.binary().builder("PARAMS");
+            builder.setField("ID", 2L);
+            builder.setField("PARTITIONID", 1L);
+            builder.setField("CLIENTID", 1L);
+            builder.setField("PARAMETRCODE", 1L);
+            builder.setField("PARAMETRVALUE", "Test value");
+            builder.setField("PARENTID", 1L);
+            obj = builder.build();
+
+            //streamer.addData(2L, obj);
+            cache.put(2L, obj);
+
+            builder = client.binary().builder("PARAMS");
+            builder.setField("ID", 3L);
+            builder.setField("PARTITIONID", 1L);
+            builder.setField("CLIENTID", 1L);
+            builder.setField("PARAMETRCODE", 1L);
+            builder.setField("PARAMETRVALUE", "Test value");
+            builder.setField("PARENTID", 1L);
+            obj = builder.build();
+
+            //streamer.addData(3L, obj);
+            cache.put(3L, obj);
+
+            builder = client.binary().builder("PARAMS");
+            builder.setField("ID", 4L);
+            builder.setField("PARTITIONID", 1L);
+            builder.setField("CLIENTID", 1L);
+            builder.setField("PARAMETRCODE", 1L);
+            builder.setField("PARAMETRVALUE", "Test value");
+            builder.setField("PARENTID", 1L);
+            obj = builder.build();
+
+            cache.put(4L, obj);
+
+            info("Post-insert");
+
+            obj = cache.get(4L);
+
+            assertNotNull(obj);
+
+            info("End");
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/1d0cbb45/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
index 96e8551..4528b30 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheWithIndexingTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.processors.cache.CacheConfigurationP2PTest;
 import org.apache.ignite.internal.processors.cache.CacheIndexStreamerTest;
 import org.apache.ignite.internal.processors.cache.CacheOperationsWithExpirationTest;
 import org.apache.ignite.internal.processors.cache.CacheRandomOperationsMultithreadedTest;
+import org.apache.ignite.internal.processors.cache.ClientReconnectAfterClusterRestartTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapAndSwapSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffHeapSelfTest;
 import org.apache.ignite.internal.processors.cache.GridCacheOffheapIndexEntryEvictTest;
@@ -79,6 +80,8 @@ public class IgniteCacheWithIndexingTestSuite extends TestSuite {
         suite.addTestSuite(CacheOperationsWithExpirationTest.class);
         suite.addTestSuite(CacheBinaryKeyConcurrentQueryTest.class);
 
+        suite.addTestSuite(ClientReconnectAfterClusterRestartTest.class);
+
         return suite;
     }
-}
\ No newline at end of file
+}


Mime
View raw message