Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D040618912 for ; Mon, 18 Jan 2016 08:23:48 +0000 (UTC) Received: (qmail 1151 invoked by uid 500); 18 Jan 2016 08:23:48 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 1065 invoked by uid 500); 18 Jan 2016 08:23:48 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 460 invoked by uid 99); 18 Jan 2016 08:23:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Jan 2016 08:23:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A93B0E054A; Mon, 18 Jan 2016 08:23:47 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Mon, 18 Jan 2016 08:23:58 -0000 Message-Id: <22d3f4be68724c489784b3bc239b2488@git.apache.org> In-Reply-To: <012e89d01bbf4f65acc3ba98e4dc3598@git.apache.org> References: <012e89d01bbf4f65acc3ba98e4dc3598@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/29] ignite git commit: ignite-2359 Added locking for files used by MarshallerContextImpl. ignite-2359 Added locking for files used by MarshallerContextImpl. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1d8c4e25 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1d8c4e25 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1d8c4e25 Branch: refs/heads/ignite-2236 Commit: 1d8c4e259dfe17611289cfac70bf1c3b351073cd Parents: 59a893c Author: sboikov Authored: Wed Jan 13 08:56:34 2016 +0300 Committer: sboikov Committed: Wed Jan 13 08:59:39 2016 +0300 ---------------------------------------------------------------------- .../ignite/internal/MarshallerContextImpl.java | 86 +++++++-- ...eMarshallerCacheConcurrentReadWriteTest.java | 189 +++++++++++++++++++ .../dht/GridCacheTxNodeFailureSelfTest.java | 6 +- .../ignite/testsuites/IgniteBasicTestSuite.java | 2 + 4 files changed, 264 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1d8c4e25/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java index c7fa902..e3f2bc9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java @@ -19,12 +19,16 @@ package org.apache.ignite.internal; import java.io.BufferedReader; import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.io.Writer; +import java.nio.channels.FileLock; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.Lock; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryListenerException; import javax.cache.event.CacheEntryUpdatedListener; @@ -33,6 +37,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; +import org.apache.ignite.internal.util.GridStripedLock; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.PluginProvider; @@ -42,6 +47,9 @@ import org.apache.ignite.plugin.PluginProvider; */ public class MarshallerContextImpl extends MarshallerContextAdapter { /** */ + private static final GridStripedLock fileLock = new GridStripedLock(32); + + /** */ private final CountDownLatch latch = new CountDownLatch(1); /** */ @@ -72,7 +80,7 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { */ public void onMarshallerCacheStarted(GridKernalContext ctx) throws IgniteCheckedException { ctx.cache().marshallerCache().context().continuousQueries().executeInternalQuery( - new ContinuousQueryListener(log, workDir), + new ContinuousQueryListener(ctx.log(MarshallerContextImpl.class), workDir), null, ctx.cache().marshallerCache().context().affinityNode(), true @@ -149,14 +157,31 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { String clsName = cache0.getTopologySafe(id); if (clsName == null) { - File file = new File(workDir, id + ".classname"); + String fileName = id + ".classname"; + + Lock lock = fileLock(fileName); + + lock.lock(); + + try { + File file = new File(workDir, fileName); - try (BufferedReader reader = new BufferedReader(new FileReader(file))) { - clsName = reader.readLine(); + try (FileInputStream in = new FileInputStream(file)) { + FileLock fileLock = in.getChannel().lock(0L, Long.MAX_VALUE, true); + + assert fileLock != null : fileName; + + try (BufferedReader reader = new BufferedReader(new InputStreamReader(in))) { + clsName = reader.readLine(); + } + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to read class name from file [id=" + id + + ", file=" + file.getAbsolutePath() + ']', e); + } } - catch (IOException e) { - throw new IgniteCheckedException("Failed to read class name from file [id=" + id + - ", file=" + file.getAbsolutePath() + ']', e); + finally { + lock.unlock(); } // Must explicitly put entry to cache to invoke other continuous queries. @@ -167,6 +192,14 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { } /** + * @param fileName File name. + * @return Lock instance. + */ + private static Lock fileLock(String fileName) { + return fileLock.getLock(fileName.hashCode()); + } + + /** */ private static class ContinuousQueryListener implements CacheEntryUpdatedListener { /** */ @@ -185,23 +218,40 @@ public class MarshallerContextImpl extends MarshallerContextAdapter { } /** {@inheritDoc} */ - @Override public void onUpdated(Iterable> events) + @Override public void onUpdated(Iterable> evts) throws CacheEntryListenerException { - for (CacheEntryEvent evt : events) { + for (CacheEntryEvent evt : evts) { assert evt.getOldValue() == null || F.eq(evt.getOldValue(), evt.getValue()): "Received cache entry update for system marshaller cache: " + evt; if (evt.getOldValue() == null) { - File file = new File(workDir, evt.getKey() + ".classname"); + String fileName = evt.getKey() + ".classname"; + + Lock lock = fileLock(fileName); + + lock.lock(); + + try { + File file = new File(workDir, fileName); + + try (FileOutputStream out = new FileOutputStream(file)) { + FileLock fileLock = out.getChannel().lock(0L, Long.MAX_VALUE, false); + + assert fileLock != null : fileName; - try (Writer writer = new FileWriter(file)) { - writer.write(evt.getValue()); + try (Writer writer = new OutputStreamWriter(out)) { + writer.write(evt.getValue()); - writer.flush(); + writer.flush(); + } + } + catch (IOException e) { + U.error(log, "Failed to write class name to file [id=" + evt.getKey() + + ", clsName=" + evt.getValue() + ", file=" + file.getAbsolutePath() + ']', e); + } } - catch (IOException e) { - U.error(log, "Failed to write class name to file [id=" + evt.getKey() + - ", clsName=" + evt.getValue() + ", file=" + file.getAbsolutePath() + ']', e); + finally { + lock.unlock(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1d8c4e25/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheConcurrentReadWriteTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheConcurrentReadWriteTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheConcurrentReadWriteTest.java new file mode 100644 index 0000000..ad6f604 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteMarshallerCacheConcurrentReadWriteTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheRebalanceMode.SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteMarshallerCacheConcurrentReadWriteTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPeerClassLoadingEnabled(false); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setCacheMode(REPLICATED); + ccfg.setRebalanceMode(SYNC); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + + cfg.setCacheConfiguration(ccfg); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + super.afterTest(); + } + + /** + * @throws Exception If failed. + */ + public void testConcurrentReadWrite() throws Exception { + Ignite ignite = startGrid(0); + + Map data = new HashMap<>(); + + final Map dataBytes = new HashMap<>(); + + for (int i = 0; i < 10; i++) { + Object obj = null; + + switch (i % 10) { + case 0: obj = new TestClass1(); break; + case 1: obj = new TestClass2(); break; + case 2: obj = new TestClass3(); break; + case 3: obj = new TestClass4(); break; + case 4: obj = new TestClass5(); break; + case 5: obj = new TestClass6(); break; + case 6: obj = new TestClass7(); break; + case 7: obj = new TestClass8(); break; + case 8: obj = new TestClass9(); break; + case 9: obj = new TestClass10(); break; + default: fail(); + } + + data.put(i, obj); + + dataBytes.put(i, ignite.configuration().getMarshaller().marshal(obj)); + } + + ignite.cache(null).putAll(data); + + stopGrid(0); + + for (int i = 0; i < 3; i++) { + log.info("Iteration: " + i); + + final AtomicInteger idx = new AtomicInteger(); + + GridTestUtils.runMultiThreaded(new Callable() { + @Override public Void call() throws Exception { + int node = idx.getAndIncrement(); + + Ignite ignite = startGrid(node); + + IgniteCache cache = ignite.cache(null); + + for (Map.Entry e : dataBytes.entrySet()) { + Object obj = ignite.configuration().getMarshaller().unmarshal(e.getValue(), null); + + cache.put(e.getKey(), obj); + } + + ignite.cache(null).getAll(dataBytes.keySet()); + + return null; + } + }, 10, "test-thread"); + + stopAllGrids(); + } + } + + /** + * + */ + static class TestClass1 implements Serializable { } + + /** + * + */ + static class TestClass2 implements Serializable { } + + /** + * + */ + static class TestClass3 implements Serializable { } + + /** + * + */ + static class TestClass4 implements Serializable { } + + /** + * + */ + static class TestClass5 implements Serializable { } + + /** + * + */ + static class TestClass6 implements Serializable { } + + /** + * + */ + static class TestClass7 implements Serializable { } + + /** + * + */ + static class TestClass8 implements Serializable { } + + /** + * + */ + static class TestClass9 implements Serializable { } + + /** + * + */ + static class TestClass10 implements Serializable { } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1d8c4e25/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java index 78e7672..84838db 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCacheTxNodeFailureSelfTest.java @@ -78,7 +78,11 @@ public class GridCacheTxNodeFailureSelfTest extends GridCommonAbstractTest { cfg.setCacheConfiguration(cacheConfiguration(gridName)); - cfg.setCommunicationSpi(new BanningCommunicationSpi()); + BanningCommunicationSpi commSpi = new BanningCommunicationSpi(); + + commSpi.setSharedMemoryPort(-1); + + cfg.setCommunicationSpi(commSpi); return cfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1d8c4e25/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java index 7ee301c..bcd1ede 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBasicTestSuite.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.IgniteSlowClientDetectionSelfTest; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessorRendezvousSelfTest; import org.apache.ignite.internal.processors.cache.GridProjectionForCachesOnDaemonNodeSelfTest; import org.apache.ignite.internal.processors.cache.IgniteDaemonNodeMarshallerCacheTest; +import org.apache.ignite.internal.processors.cache.IgniteMarshallerCacheConcurrentReadWriteTest; import org.apache.ignite.internal.processors.cache.OffHeapTieredTransactionSelfTest; import org.apache.ignite.internal.processors.closure.GridClosureProcessorSelfTest; import org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest; @@ -111,6 +112,7 @@ public class IgniteBasicTestSuite extends TestSuite { suite.addTestSuite(OffHeapTieredTransactionSelfTest.class); suite.addTestSuite(IgniteSlowClientDetectionSelfTest.class); suite.addTestSuite(IgniteDaemonNodeMarshallerCacheTest.class); + suite.addTestSuite(IgniteMarshallerCacheConcurrentReadWriteTest.class); suite.addTestSuite(IgniteExceptionInNioWorkerSelfTest.class);