Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2BA8B200C0C for ; Thu, 22 Dec 2016 16:15:47 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2A78B160B35; Thu, 22 Dec 2016 15:15:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7DD4C160B57 for ; Thu, 22 Dec 2016 16:15:43 +0100 (CET) Received: (qmail 60614 invoked by uid 500); 22 Dec 2016 15:15:42 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 59729 invoked by uid 99); 22 Dec 2016 15:15:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Dec 2016 15:15:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 34220F1705; Thu, 22 Dec 2016 15:15:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 22 Dec 2016 15:16:19 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [39/50] [abbrv] ignite git commit: ignite-2412 Do not call 'asyncOp' for synchronous operations archived-at: Thu, 22 Dec 2016 15:15:47 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index a8219b0..4350b3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -450,61 +450,11 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { } /** {@inheritDoc} */ - @Override public V getAndPutIfAbsent(K key, V val) throws IgniteCheckedException { - return dht.getAndPutIfAbsent(key, val); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture getAndPutIfAbsentAsync(K key, V val) { - return dht.getAndPutIfAbsentAsync(key, val); - } - - /** {@inheritDoc} */ - @Override public boolean putIfAbsent(K key, V val) throws IgniteCheckedException { - return dht.putIfAbsent(key, val); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture putIfAbsentAsync(K key, V val) { - return dht.putIfAbsentAsync(key, val); - } - - /** {@inheritDoc} */ @Nullable @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { return dht.tryGetAndPut(key, val); } /** {@inheritDoc} */ - @Override public V getAndReplace(K key, V val) throws IgniteCheckedException { - return dht.getAndReplace(key, val); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture getAndReplaceAsync(K key, V val) { - return dht.getAndReplaceAsync(key, val); - } - - /** {@inheritDoc} */ - @Override public boolean replace(K key, V val) throws IgniteCheckedException { - return dht.replace(key, val); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture replaceAsync(K key, V val) { - return dht.replaceAsync(key, val); - } - - /** {@inheritDoc} */ - @Override public boolean replace(K key, V oldVal, V newVal) throws IgniteCheckedException { - return dht.replace(key, oldVal, newVal); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture replaceAsync(K key, V oldVal, V newVal) { - return dht.replaceAsync(key, oldVal, newVal); - } - - /** {@inheritDoc} */ @Override public void putAll(Map m) throws IgniteCheckedException { dht.putAll(m); @@ -569,6 +519,11 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { } /** {@inheritDoc} */ + @Override public boolean remove(K key, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException { + return dht.remove(key, filter); + } + + /** {@inheritDoc} */ @Override public V getAndRemove(K key) throws IgniteCheckedException { return dht.getAndRemove(key); } @@ -602,16 +557,6 @@ public class GridNearAtomicCache extends GridNearCacheAdapter { } /** {@inheritDoc} */ - @Override public boolean remove(K key, V val) throws IgniteCheckedException { - return dht.remove(key, val); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture removeAsync(K key, V val) { - return dht.removeAsync(key, val); - } - - /** {@inheritDoc} */ @Override public void removeAll() throws IgniteCheckedException { dht.removeAll(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index bc16ff4..a26d2f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -58,7 +58,6 @@ import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.resource.GridResourceIoc; -import org.apache.ignite.internal.processors.resource.GridResourceProcessor; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; @@ -108,6 +107,11 @@ public class GridLocalAtomicCache extends GridLocalCache { } /** {@inheritDoc} */ + @Override protected void checkJta() throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ @Override public boolean isLocal() { return true; } @@ -119,9 +123,7 @@ public class GridLocalAtomicCache extends GridLocalCache { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public V getAndPut(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException { - A.notNull(key, "key", val, "val"); - + @Override protected V getAndPut0(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException { CacheOperationContext opCtx = ctx.operationContextPerCall(); return (V)updateAllInternal(UPDATE, @@ -138,16 +140,10 @@ public class GridLocalAtomicCache extends GridLocalCache { } /** {@inheritDoc} */ - @Override public boolean put(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException { - A.notNull(key, "key", val, "val"); - - boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - long start = statsEnabled ? System.nanoTime() : 0L; - + @Override protected boolean put0(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException { CacheOperationContext opCtx = ctx.operationContextPerCall(); - boolean res = (Boolean)updateAllInternal(UPDATE, + Boolean res = (Boolean)updateAllInternal(UPDATE, Collections.singleton(key), Collections.singleton(val), null, @@ -159,8 +155,7 @@ public class GridLocalAtomicCache extends GridLocalCache { ctx.readThrough(), opCtx != null && opCtx.isKeepBinary()); - if (statsEnabled) - metrics0().addPutTimeNanos(System.nanoTime() - start); + assert res != null; return res; } @@ -168,8 +163,6 @@ public class GridLocalAtomicCache extends GridLocalCache { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture getAndPutAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { - A.notNull(key, "key", val, "val"); - return updateAllAsync0(F0.asMap(key, val), null, null, @@ -181,8 +174,6 @@ public class GridLocalAtomicCache extends GridLocalCache { /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public IgniteInternalFuture putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { - A.notNull(key, "key", val, "val"); - return updateAllAsync0(F0.asMap(key, val), null, null, @@ -192,65 +183,7 @@ public class GridLocalAtomicCache extends GridLocalCache { } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public V getAndPutIfAbsent(K key, V val) throws IgniteCheckedException { - return getAndPut(key, val, ctx.noVal()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture getAndPutIfAbsentAsync(K key, V val) { - return getAndPutAsync(key, val, ctx.noVal()); - } - - /** {@inheritDoc} */ - @Override public boolean putIfAbsent(K key, V val) throws IgniteCheckedException { - return put(key, val, ctx.noVal()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture putIfAbsentAsync(K key, V val) { - return putAsync(key, val, ctx.noVal()); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public V getAndReplace(K key, V val) throws IgniteCheckedException { - return getAndPut(key, val, ctx.hasVal()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture getAndReplaceAsync(K key, V val) { - return getAndPutAsync(key, val, ctx.hasVal()); - } - - /** {@inheritDoc} */ - @Override public boolean replace(K key, V val) throws IgniteCheckedException { - return put(key, val, ctx.hasVal()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture replaceAsync(K key, V val) { - return putAsync(key, val, ctx.hasVal()); - } - - /** {@inheritDoc} */ - @Override public boolean replace(K key, V oldVal, V newVal) throws IgniteCheckedException { - A.notNull(oldVal, "oldVal"); - - return put(key, newVal, ctx.equalsVal(oldVal)); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture replaceAsync(K key, V oldVal, V newVal) { - return putAsync(key, newVal, ctx.equalsVal(oldVal)); - } - - /** {@inheritDoc} */ - @Override public void putAll(Map m) throws IgniteCheckedException { - boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - long start = statsEnabled ? System.nanoTime() : 0L; - + @Override protected void putAll0(Map m) throws IgniteCheckedException { CacheOperationContext opCtx = ctx.operationContextPerCall(); updateAllInternal(UPDATE, @@ -264,13 +197,10 @@ public class GridLocalAtomicCache extends GridLocalCache { ctx.writeThrough(), ctx.readThrough(), opCtx != null && opCtx.isKeepBinary()); - - if (statsEnabled) - metrics0().addPutTimeNanos(System.nanoTime() - start); } /** {@inheritDoc} */ - @Override public IgniteInternalFuture putAllAsync(Map m) { + @Override public IgniteInternalFuture putAllAsync0(Map m) { return updateAllAsync0(m, null, null, @@ -280,8 +210,7 @@ public class GridLocalAtomicCache extends GridLocalCache { } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public V getAndRemove(K key) throws IgniteCheckedException { + @Override protected V getAndRemove0(K key) throws IgniteCheckedException { CacheOperationContext opCtx = ctx.operationContextPerCall(); return (V)updateAllInternal(DELETE, @@ -299,13 +228,13 @@ public class GridLocalAtomicCache extends GridLocalCache { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture getAndRemoveAsync(K key) { + @Override public IgniteInternalFuture getAndRemoveAsync0(K key) { return removeAllAsync0(Collections.singletonList(key), true, false, null); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public void removeAll(Collection keys) throws IgniteCheckedException { + @Override public void removeAll0(Collection keys) throws IgniteCheckedException { CacheOperationContext opCtx = ctx.operationContextPerCall(); updateAllInternal(DELETE, @@ -322,19 +251,13 @@ public class GridLocalAtomicCache extends GridLocalCache { } /** {@inheritDoc} */ - @Override public IgniteInternalFuture removeAllAsync(Collection keys) { + @Override public IgniteInternalFuture removeAllAsync0(Collection keys) { return removeAllAsync0(keys, false, false, null).chain(RET2NULL); } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public boolean remove(K key) throws IgniteCheckedException { - boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - long start = statsEnabled ? System.nanoTime() : 0L; - - A.notNull(key, "key"); - + @Override public boolean remove0(K key, final CacheEntryPredicate filter) throws IgniteCheckedException { CacheOperationContext opCtx = ctx.operationContextPerCall(); Boolean rmv = (Boolean)updateAllInternal(DELETE, @@ -344,50 +267,23 @@ public class GridLocalAtomicCache extends GridLocalCache { expiryPerCall(), false, false, - null, + filter, ctx.writeThrough(), ctx.readThrough(), opCtx != null && opCtx.isKeepBinary()); - if (statsEnabled && rmv) - metrics0().addRemoveTimeNanos(System.nanoTime() - start); + assert rmv != null; return rmv; } /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public IgniteInternalFuture removeAsync(K key, @Nullable CacheEntryPredicate filter) { - A.notNull(key, "key"); - + @Override public IgniteInternalFuture removeAsync0(K key, @Nullable CacheEntryPredicate filter) { return removeAllAsync0(Collections.singletonList(key), false, false, filter); } /** {@inheritDoc} */ - @Override public boolean remove(K key, V val) throws IgniteCheckedException { - A.notNull(key, "key", val, "val"); - - CacheOperationContext opCtx = ctx.operationContextPerCall(); - - return (Boolean)updateAllInternal(DELETE, - Collections.singleton(key), - null, - null, - expiryPerCall(), - false, - false, - ctx.equalsVal(val), - ctx.writeThrough(), - ctx.readThrough(), - opCtx != null && opCtx.isKeepBinary()); - } - - /** {@inheritDoc} */ - @Override public IgniteInternalFuture removeAsync(K key, V val) { - return removeAsync(key, ctx.equalsVal(val)); - } - - /** {@inheritDoc} */ @Override public IgniteInternalFuture removeAllAsync() { return ctx.closures().callLocalSafe(new Callable() { @Override public Void call() throws Exception { @@ -399,11 +295,13 @@ public class GridLocalAtomicCache extends GridLocalCache { } /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override @Nullable public V get(K key, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { - String taskName = ctx.kernalContext().job().currentTaskName(); - + @Override protected V get0( + final K key, + String taskName, + boolean deserializeBinary, + boolean needVer) throws IgniteCheckedException + { Map m = getAllInternal(Collections.singleton(key), ctx.isSwapOrOffheapEnabled(), ctx.readThrough(), @@ -419,7 +317,7 @@ public class GridLocalAtomicCache extends GridLocalCache { /** {@inheritDoc} */ @SuppressWarnings("unchecked") - @Override public final Map getAll(Collection keys, boolean deserializeBinary, boolean needVer) + @Override public final Map getAll0(Collection keys, boolean deserializeBinary, boolean needVer) throws IgniteCheckedException { A.notNull(keys, "keys"); @@ -794,7 +692,7 @@ public class GridLocalAtomicCache extends GridLocalCache { final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); - IgniteInternalFuture fut = asyncOp(new Callable() { + return asyncOp(new Callable() { @Override public Object call() throws Exception { return updateAllInternal(op, keys, @@ -809,11 +707,6 @@ public class GridLocalAtomicCache extends GridLocalCache { keepBinary); } }); - - if (ctx.config().isStatisticsEnabled()) - fut.listen(new UpdatePutTimeStatClosure(metrics0(), System.nanoTime())); - - return fut; } /** @@ -835,17 +728,13 @@ public class GridLocalAtomicCache extends GridLocalCache { final boolean readThrough = ctx.readThrough(); - final boolean statsEnabled = ctx.config().isStatisticsEnabled(); - - final long start = statsEnabled ? System.nanoTime() : 0L; - final ExpiryPolicy expiryPlc = expiryPerCall(); CacheOperationContext opCtx = ctx.operationContextPerCall(); final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); - IgniteInternalFuture fut = asyncOp(new Callable() { + return asyncOp(new Callable() { @Override public Object call() throws Exception { return updateAllInternal(DELETE, keys, @@ -860,11 +749,6 @@ public class GridLocalAtomicCache extends GridLocalCache { keepBinary); } }); - - if (statsEnabled) - fut.listen(new UpdateRemoveTimeStatClosure<>(metrics0(), start)); - - return fut; } /** @@ -1584,10 +1468,7 @@ public class GridLocalAtomicCache extends GridLocalCache { * @return Future. */ @SuppressWarnings("unchecked") - protected IgniteInternalFuture asyncOp(final Callable op) { - if (!asyncToggled) - return ctx.closures().callLocalSafe(op); - + private IgniteInternalFuture asyncOp(final Callable op) { IgniteInternalFuture fail = asyncOpAcquire(); if (fail != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java index 3e3b84e..648134e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest.java @@ -34,7 +34,7 @@ public class CachePartitionedNearEnabledMultiNodeLongTxTimeoutFullApiTest extend @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); - cfg.getTransactionConfiguration().setDefaultTxTimeout(Long.MAX_VALUE); + cfg.getTransactionConfiguration().setDefaultTxTimeout(5 * 60_000); return cfg; } http://git-wip-us.apache.org/repos/asf/ignite/blob/ad785cbd/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java index ec3b808..57c709b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsBlockMessageSystemPoolStarvationSelfTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.igfs; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteFileSystem; @@ -39,14 +41,11 @@ import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.transactions.TransactionConcurrency; -import org.apache.ignite.transactions.TransactionIsolation; - -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; /** * Test to check for system pool starvation due to {@link IgfsBlocksMessage}. @@ -125,8 +124,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst @Override public Void call() throws Exception { GridCacheAdapter dataCache = dataCache(attacker); - try (IgniteInternalTx tx = - dataCache.txStartEx(TransactionConcurrency.PESSIMISTIC, TransactionIsolation.REPEATABLE_READ)) { + try (IgniteInternalTx tx = dataCache.txStartEx(PESSIMISTIC, REPEATABLE_READ)) { dataCache.put(DATA_KEY, 0); txStartLatch.countDown(); @@ -185,6 +183,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst * Create IGFS file asynchronously. * * @param path Path. + * @param writeStartLatch Write start latch. * @return Future. */ private IgniteInternalFuture createFileAsync(final IgfsPath path, final CountDownLatch writeStartLatch) { @@ -265,6 +264,7 @@ public class IgfsBlockMessageSystemPoolStarvationSelfTest extends IgfsCommonAbst cfg.setLocalHost("127.0.0.1"); cfg.setConnectorConfiguration(null); + cfg.setStripedPoolSize(0); cfg.setSystemThreadPoolSize(2); cfg.setRebalanceThreadPoolSize(1); cfg.setPublicThreadPoolSize(1);