Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B7431200C5C for ; Thu, 20 Apr 2017 14:22:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B2DB2160B9F; Thu, 20 Apr 2017 12:22:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0FA68160B91 for ; Thu, 20 Apr 2017 14:22:28 +0200 (CEST) Received: (qmail 72705 invoked by uid 500); 20 Apr 2017 12:22:28 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 72696 invoked by uid 99); 20 Apr 2017 12:22:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 20 Apr 2017 12:22:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 104DDE110C; Thu, 20 Apr 2017 12:22:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Thu, 20 Apr 2017 12:22:27 -0000 Message-Id: <1cac75c9eea746098196010e061513a7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] ignite git commit: ignite-2893 For datastructures use invoke instead of explicit txs, got rid of unnecessary outTx usage. archived-at: Thu, 20 Apr 2017 12:22:30 -0000 Repository: ignite Updated Branches: refs/heads/ignite-1794 09e342708 -> 45c38bece http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java index 6724f78..e154850 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheLockImpl.java @@ -516,11 +516,10 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo /** * This method is used for synchronizing the reentrant lock state across all nodes. */ - protected boolean compareAndSetGlobalState(final int expVal, final int newVal, + boolean compareAndSetGlobalState(final int expVal, final int newVal, final Thread newThread, final boolean bargingProhibited) { try { - return CU.outTx( - retryTopologySafe(new Callable() { + return retryTopologySafe(new Callable() { @Override public Boolean call() throws Exception { try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheLockState val = lockView.get(key); @@ -594,9 +593,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo throw e; } } - }), - ctx - ); + }); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -608,12 +605,11 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo * * @param cancelled true if acquire attempt is cancelled, false if acquire attempt should be registered. */ - protected boolean synchronizeQueue(final boolean cancelled, final Thread thread) { + boolean synchronizeQueue(final boolean cancelled, final Thread thread) { final AtomicBoolean interrupted = new AtomicBoolean(false); try { - return CU.outTx( - retryTopologySafe(new Callable() { + return retryTopologySafe(new Callable() { @Override public Boolean call() throws Exception { try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheLockState val = lockView.get(key); @@ -686,9 +682,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo throw e; } } - }), - ctx - ); + }); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -704,13 +698,14 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo * Sets the global state across all nodes after releasing the reentrant lock. * * @param newVal New state. - * @param lastCondition Id of the condition await is called. + * @param lastCond Id of the condition await is called. * @param outgoingSignals Map containing signal calls on this node since the last acquisition of the lock. */ - protected boolean setGlobalState(final int newVal, @Nullable final String lastCondition, final Map outgoingSignals) { + protected boolean setGlobalState(final int newVal, + @Nullable final String lastCond, + final Map outgoingSignals) { try { - return CU.outTx( - retryTopologySafe(new Callable() { + return retryTopologySafe(new Callable() { @Override public Boolean call() throws Exception { try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheLockState val = lockView.get(key); @@ -730,9 +725,9 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo // If this lock is fair, remove this node from queue. if (val.isFair() && newVal == 0) { - UUID removedNode = val.getNodes().removeFirst(); + UUID rmvdNode = val.getNodes().removeFirst(); - assert(thisNode.equals(removedNode)); + assert(thisNode.equals(rmvdNode)); } // Get global condition queue. @@ -751,9 +746,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo if (list != null && !list.isEmpty()) { // Check if signalAll was called. - if (cnt == 0) { + if (cnt == 0) cnt = list.size(); - } // Remove from global condition queue. for (int i = 0; i < cnt; i++) { @@ -787,20 +781,20 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo // Check if this release is called after condition.await() call; // If true, add this node to the global waiting queue. - if (lastCondition != null) { + if (lastCond != null) { LinkedList queue; //noinspection IfMayBeConditional - if (!condMap.containsKey(lastCondition)) + if (!condMap.containsKey(lastCond)) // New condition object. queue = new LinkedList<>(); else // Existing condition object. - queue = condMap.get(lastCondition); + queue = condMap.get(lastCond); queue.add(thisNode); - condMap.put(lastCondition, queue); + condMap.put(lastCond, queue); } val.setConditionMap(condMap); @@ -824,16 +818,14 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo throw e; } } - }), - ctx - ); + }); } catch (IgniteCheckedException e) { throw U.convertException(e); } } - protected synchronized boolean checkIncomingSignals(GridCacheLockState state) { + synchronized boolean checkIncomingSignals(GridCacheLockState state) { if (state.getSignals() == null) return false; @@ -882,16 +874,16 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo private final String name; /** */ - private final AbstractQueuedSynchronizer.ConditionObject object; + private final AbstractQueuedSynchronizer.ConditionObject obj; /** * @param name Condition name. - * @param object Condition object. + * @param obj Condition object. */ - protected IgniteConditionObject(String name, ConditionObject object) { + protected IgniteConditionObject(String name, ConditionObject obj) { this.name = name; - this.object = object; + this.obj = obj; } /** @@ -913,7 +905,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo lastCondition = name; - object.await(); + obj.await(); sync.validate(true); } @@ -935,7 +927,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo lastCondition = name; - object.awaitUninterruptibly(); + obj.awaitUninterruptibly(); sync.validate(false); } @@ -954,7 +946,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo lastCondition = name; - long result = object.awaitNanos(nanosTimeout); + long result = obj.awaitNanos(nanosTimeout); sync.validate(true); @@ -978,7 +970,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo lastCondition = name; - boolean result = object.await(time, unit); + boolean result = obj.await(time, unit); sync.validate(true); @@ -1002,7 +994,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo lastCondition = name; - boolean result = object.awaitUntil(deadline); + boolean result = obj.awaitUntil(deadline); sync.validate(true); @@ -1087,8 +1079,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo private void initializeReentrantLock() throws IgniteCheckedException { if (initGuard.compareAndSet(false, true)) { try { - sync = CU.outTx( - retryTopologySafe(new Callable() { + sync = retryTopologySafe(new Callable() { @Override public Sync call() throws Exception { try (GridNearTxLocal tx = CU.txStartInternal(ctx, lockView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheLockState val = lockView.get(key); @@ -1105,9 +1096,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo return new Sync(val); } } - }), - ctx - ); + }); if (log.isDebugEnabled()) log.debug("Initialized internal sync structure: " + sync); @@ -1138,7 +1127,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo return; // Check if update came from this node. - boolean local = sync.isLockedLocally(val.getId()); + boolean loc = sync.isLockedLocally(val.getId()); // Process any incoming signals. boolean incomingSignals = sync.checkIncomingSignals(val); @@ -1153,7 +1142,7 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo sync.setCurrentOwnerThread(val.getThreadId()); // Check if any threads waiting on this node need to be notified. - if ((incomingSignals || sync.getPermits() == 0) && !local) { + if ((incomingSignals || sync.getPermits() == 0) && !loc) { // Try to notify any waiting threads. sync.release(0); } @@ -1171,9 +1160,8 @@ public final class GridCacheLockImpl implements GridCacheLockEx, IgniteChangeGlo if (nodeId.equals(sync.getOwnerNode())) { sync.setBroken(true); - if (!sync.failoverSafe) { + if (!sync.failoverSafe) sync.interruptAll(); - } } // Try to notify any waiting threads. http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java index e38c772..b31a154 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheQueueProxy.java @@ -25,14 +25,12 @@ import java.io.ObjectOutput; import java.io.ObjectStreamException; import java.util.Collection; import java.util.Iterator; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteQueue; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheGateway; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; @@ -93,18 +91,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.add(item); - } - }, cctx); - return delegate.add(item); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -115,18 +103,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.offer(item); - } - }, cctx); - return delegate.offer(item); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -137,18 +115,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.addAll(items); - } - }, cctx); - return delegate.addAll(items); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -160,18 +128,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.contains(item); - } - }, cctx); - return delegate.contains(item); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -182,18 +140,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.containsAll(items); - } - }, cctx); - return delegate.containsAll(items); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -204,20 +152,7 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) { - CU.outTx(new Callable() { - @Override public Void call() throws Exception { - delegate.clear(); - - return null; - } - }, cctx); - } - else - delegate.clear(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + delegate.clear(); } finally { gate.leave(); @@ -230,18 +165,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.remove(item); - } - }, cctx); - return delegate.remove(item); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -252,18 +177,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.removeAll(items); - } - }, cctx); - return delegate.removeAll(items); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -274,18 +189,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.isEmpty(); - } - }, cctx); - return delegate.isEmpty(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -296,18 +201,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable>() { - @Override public Iterator call() throws Exception { - return delegate.iterator(); - } - }, cctx); - return delegate.iterator(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -318,18 +213,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Object[] call() throws Exception { - return delegate.toArray(); - } - }, cctx); - return delegate.toArray(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -341,18 +226,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public T1[] call() throws Exception { - return delegate.toArray(a); - } - }, cctx); - return delegate.toArray(a); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -363,18 +238,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.retainAll(items); - } - }, cctx); - return delegate.retainAll(items); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -385,18 +250,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Integer call() throws Exception { - return delegate.size(); - } - }, cctx); - return delegate.size(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -407,18 +262,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public T call() throws Exception { - return delegate.poll(); - } - }, cctx); - return delegate.poll(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -429,18 +274,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public T call() throws Exception { - return delegate.peek(); - } - }, cctx); - return delegate.peek(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -451,20 +286,7 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) { - CU.outTx(new Callable() { - @Override public Void call() throws Exception { - delegate.clear(batchSize); - - return null; - } - }, cctx); - } - else - delegate.clear(batchSize); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + delegate.clear(batchSize); } finally { gate.leave(); @@ -476,18 +298,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Integer call() throws Exception { - return delegate.remainingCapacity(); - } - }, cctx); - return delegate.remainingCapacity(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -498,18 +310,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Integer call() throws Exception { - return delegate.drainTo(c); - } - }, cctx); - return delegate.drainTo(c); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -520,18 +322,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Integer call() throws Exception { - return delegate.drainTo(c, maxElements); - } - }, cctx); - return delegate.drainTo(c, maxElements); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -542,18 +334,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public T call() throws Exception { - return delegate.remove(); - } - }, cctx); - return delegate.remove(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -564,18 +346,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public T call() throws Exception { - return delegate.element(); - } - }, cctx); - return delegate.element(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -586,20 +358,7 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) { - CU.outTx(new Callable() { - @Override public Void call() throws Exception { - delegate.put(item); - - return null; - } - }, cctx); - } - else - delegate.put(item); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + delegate.put(item); } finally { gate.leave(); @@ -611,18 +370,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.offer(item, timeout, unit); - } - }, cctx); - return delegate.offer(item, timeout, unit); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -633,18 +382,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public T call() throws Exception { - return delegate.take(); - } - }, cctx); - return delegate.take(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -655,18 +394,8 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public T call() throws Exception { - return delegate.poll(timeout, unit); - } - }, cctx); - return delegate.poll(timeout, unit); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -677,20 +406,7 @@ public class GridCacheQueueProxy implements IgniteQueue, Externalizable { gate.enter(); try { - if (cctx.transactional()) { - CU.outTx(new Callable() { - @Override public Void call() throws Exception { - delegate.close(); - - return null; - } - }, cctx); - } - else - delegate.close(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + delegate.close(); } finally { gate.leave(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java index 0039fa2..edc322e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java @@ -109,6 +109,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit * Synchronization implementation for semaphore. Uses AQS state to represent permits. */ final class Sync extends AbstractQueuedSynchronizer { + /** */ private static final long serialVersionUID = 1192457210091910933L; /** Map containing number of acquired permits for each node waiting on this semaphore. */ @@ -132,7 +133,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit * * @param nodeMap NodeMap. */ - protected synchronized void setWaiters(Map nodeMap) { + synchronized void setWaiters(Map nodeMap) { this.nodeMap = nodeMap; } @@ -141,7 +142,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit * * @return Number of nodes waiting at this semaphore. */ - public int getWaiters() { + int getWaiters() { int totalWaiters = 0; for (Integer i : nodeMap.values()) { @@ -159,7 +160,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit * @return Number of permits node has acquired at this semaphore. Can be less than 0 if more permits were * released than acquired on node. */ - public int getPermitsForNode(UUID nodeID) { + int getPermitsForNode(UUID nodeID) { return nodeMap.containsKey(nodeID) ? nodeMap.get(nodeID) : 0; } @@ -220,9 +221,8 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit int remaining = available - acquires; - if (remaining < 0 || compareAndSetGlobalState(available, remaining, false)) { + if (remaining < 0 || compareAndSetGlobalState(available, remaining, false)) return remaining; - } } } @@ -270,10 +270,10 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit if (broken) return 1; - int current = getState(); + int curr = getState(); - if (current == 0 || compareAndSetGlobalState(current, 0, true)) - return current; + if (curr == 0 || compareAndSetGlobalState(curr, 0, true)) + return curr; } } @@ -285,10 +285,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit * @param draining True if used for draining the permits. * @return True if this is the call that succeeded to change the global state. */ - protected boolean compareAndSetGlobalState(final int expVal, final int newVal, final boolean draining) { + boolean compareAndSetGlobalState(final int expVal, final int newVal, final boolean draining) { try { - return CU.outTx( - retryTopologySafe(new Callable() { + return retryTopologySafe(new Callable() { @Override public Boolean call() throws Exception { try (GridNearTxLocal tx = CU.txStartInternal(ctx, semView, @@ -343,9 +342,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit throw e; } } - }), - ctx - ); + }); } catch (IgniteCheckedException e) { if (ctx.kernalContext().isStopping()) { @@ -367,10 +364,9 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit * @param broken Flag indicating that this semaphore is broken. * @return True if this is the call that succeeded to change the global state. */ - protected boolean releaseFailedNode(final UUID nodeId, final boolean broken) { + boolean releaseFailedNode(final UUID nodeId, final boolean broken) { try { - return CU.outTx( - retryTopologySafe(new Callable() { + return retryTopologySafe(new Callable() { @Override public Boolean call() throws Exception { try ( GridNearTxLocal tx = CU.txStartInternal(ctx, @@ -434,9 +430,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit throw e; } } - }), - ctx - ); + }); } catch (IgniteCheckedException e) { if (ctx.kernalContext().isStopping()) { @@ -484,8 +478,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit private void initializeSemaphore() throws IgniteCheckedException { if (!initGuard.get() && initGuard.compareAndSet(false, true)) { try { - sync = CU.outTx( - retryTopologySafe(new Callable() { + sync = retryTopologySafe(new Callable() { @Override public Sync call() throws Exception { try (GridNearTxLocal tx = CU.txStartInternal(ctx, semView, PESSIMISTIC, REPEATABLE_READ)) { @@ -513,9 +506,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit return sync; } } - }), - ctx - ); + }); if (log.isDebugEnabled()) log.debug("Initialized internal sync structure: " + sync); @@ -730,8 +721,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit try { initializeSemaphore(); - ret = CU.outTx( - retryTopologySafe(new Callable() { + ret = retryTopologySafe(new Callable() { @Override public Integer call() throws Exception { try ( GridNearTxLocal tx = CU.txStartInternal(ctx, @@ -749,9 +739,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit return cnt; } } - }), - ctx - ); + }); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -787,7 +775,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit try { initializeSemaphore(); - boolean result = sync.nonfairTryAcquireShared(1) >= 0; + boolean res = sync.nonfairTryAcquireShared(1) >= 0; if (isBroken()) { Thread.interrupted(); // Clear interrupt flag. @@ -795,7 +783,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit throw new InterruptedException(); } - return result; + return res; } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -815,7 +803,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit try { initializeSemaphore(); - boolean result = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); + boolean res = sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); if (isBroken()) { Thread.interrupted(); // Clear interrupt flag. @@ -823,7 +811,7 @@ public final class GridCacheSemaphoreImpl implements GridCacheSemaphoreEx, Ignit throw new InterruptedException(); } - return result; + return res; } catch (IgniteCheckedException e) { throw U.convertException(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java index 0eb6307..3dfa71f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSetProxy.java @@ -25,14 +25,12 @@ import java.io.ObjectOutput; import java.io.ObjectStreamException; import java.util.Collection; import java.util.Iterator; -import java.util.concurrent.Callable; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteSet; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheGateway; import org.apache.ignite.internal.util.GridSpinBusyLock; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteCallable; @@ -113,18 +111,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Integer call() throws Exception { - return delegate.size(); - } - }, cctx); - return delegate.size(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -142,18 +130,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.isEmpty(); - } - }, cctx); - return delegate.isEmpty(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -171,18 +149,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.contains(o); - } - }, cctx); - return delegate.contains(o); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -200,18 +168,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Object[] call() throws Exception { - return delegate.toArray(); - } - }, cctx); - return delegate.toArray(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -229,18 +187,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public T1[] call() throws Exception { - return delegate.toArray(a); - } - }, cctx); - return delegate.toArray(a); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -258,18 +206,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.add(t); - } - }, cctx); - return delegate.add(t); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -287,18 +225,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.remove(o); - } - }, cctx); - return delegate.remove(o); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -316,18 +244,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.containsAll(c); - } - }, cctx); - return delegate.containsAll(c); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -345,18 +263,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.addAll(c); - } - }, cctx); - return delegate.addAll(c); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -374,18 +282,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.retainAll(c); - } - }, cctx); - return delegate.retainAll(c); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -403,18 +301,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable() { - @Override public Boolean call() throws Exception { - return delegate.removeAll(c); - } - }, cctx); - return delegate.removeAll(c); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -432,20 +320,7 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) { - CU.outTx(new Callable() { - @Override public Void call() throws Exception { - delegate.clear(); - - return null; - } - }, cctx); - } - else - delegate.clear(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + delegate.clear(); } finally { gate.leave(); @@ -464,18 +339,8 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) - return CU.outTx(new Callable>() { - @Override public Iterator call() throws Exception { - return delegate.iterator(); - } - }, cctx); - return delegate.iterator(); } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } finally { gate.leave(); } @@ -490,20 +355,7 @@ public class GridCacheSetProxy implements IgniteSet, Externalizable { gate.enter(); try { - if (cctx.transactional()) { - CU.outTx(new Callable() { - @Override public Void call() throws Exception { - delegate.close(); - - return null; - } - }, cctx); - } - else - delegate.close(); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); + delegate.close(); } finally { gate.leave(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ee955df9/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java index 846eb69..6a19281 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridTransactionalCacheQueueImpl.java @@ -75,7 +75,7 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter return retVal; } } - }).call(); + }); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -120,7 +120,7 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter } } } - }).call(); + }); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -169,7 +169,7 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter return retVal; } } - }).call(); + }); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -202,7 +202,7 @@ public class GridTransactionalCacheQueueImpl extends GridCacheQueueAdapter return null; } - }).call(); + }); } catch (RuntimeException e) { throw e;