Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 038EB200CBA for ; Mon, 3 Jul 2017 11:15:52 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 01F0F160BEC; Mon, 3 Jul 2017 09:15:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 21623160BE4 for ; Mon, 3 Jul 2017 11:15:50 +0200 (CEST) Received: (qmail 74858 invoked by uid 500); 3 Jul 2017 09:15:50 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 74849 invoked by uid 99); 3 Jul 2017 09:15:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Jul 2017 09:15:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 58FF8E0921; Mon, 3 Jul 2017 09:15:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Message-Id: <93d7b970d0ac4c48bf8d6eda07b98bb4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: IGNITE-5613 - Fixed deadlock on sequence update inside transaction Date: Mon, 3 Jul 2017 09:15:49 +0000 (UTC) archived-at: Mon, 03 Jul 2017 09:15:52 -0000 Repository: ignite Updated Branches: refs/heads/master b762417ab -> 7db925c1c IGNITE-5613 - Fixed deadlock on sequence update inside transaction Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7db925c1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7db925c1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7db925c1 Branch: refs/heads/master Commit: 7db925c1c445260ffe45a8ede54d24d99db0fddd Parents: b762417 Author: Alexey Goncharuk Authored: Thu Jun 29 17:11:39 2017 +0300 Committer: Alexey Goncharuk Committed: Mon Jul 3 12:14:59 2017 +0300 ---------------------------------------------------------------------- .../GridCacheAtomicSequenceImpl.java | 99 ++++++++++++-------- 1 file changed, 59 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7db925c1/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java index 5a87e4a..31ec16f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicSequenceImpl.java @@ -30,6 +30,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.IgniteInternalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; @@ -91,8 +92,14 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc /** Sequence batch size */ private volatile int batchSize; - /** Synchronization lock. */ - private final Lock lock = new ReentrantLock(); + /** Synchronization lock for local value updates. */ + private final Lock localUpdate = new ReentrantLock(); + + /** Synchronization for distributed sequence update. Acquired by threads with free topology (not in TX). */ + private final ReentrantLock distUpdateFreeTop = new ReentrantLock(); + + /** Synchronization for distributed sequence update. Acquired by threads with locked topology (inside TX). */ + private final ReentrantLock distUpdateLockedTop = new ReentrantLock(); /** Callable for execution {@link #incrementAndGet} operation in async and sync mode. */ private final Callable incAndGetCall = internalUpdate(1, true); @@ -214,7 +221,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc assert l > 0; - lock.lock(); + localUpdate.lock(); try { // If reserved range isn't exhausted. @@ -225,7 +232,24 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc return updated ? locVal0 + l : locVal0; } + } + finally { + localUpdate.unlock(); + } + + AffinityTopologyVersion lockedVer = ctx.shared().lockedTopologyVersion(null); + + // We need two separate locks here because two independent thread may attempt to update the sequence + // simultaneously, one thread with locked topology and other with unlocked. + // We cannot use the same lock for both cases because it leads to a deadlock when free-topology thread + // waits for topology change, and locked topology thread waits to acquire the lock. + // If a thread has locked topology, it must bypass sync with non-locked threads, but at the same time + // we do not want multiple threads to attempt to run identical cache updates. + ReentrantLock distLock = lockedVer == null ? distUpdateFreeTop : distUpdateLockedTop; + + distLock.lock(); + try { if (updateCall == null) updateCall = internalUpdate(l, updated); @@ -240,7 +264,7 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc } } finally { - lock.unlock(); + distLock.unlock(); } } @@ -260,13 +284,13 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc @Override public void batchSize(int size) { A.ensure(size > 0, " Batch size can't be less then 0: " + size); - lock.lock(); + localUpdate.lock(); try { batchSize = size; } finally { - lock.unlock(); + localUpdate.unlock(); } } @@ -348,6 +372,8 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc private Callable internalUpdate(final long l, final boolean updated) { return new Callable() { @Override public Long call() throws Exception { + assert distUpdateFreeTop.isHeldByCurrentThread() || distUpdateLockedTop.isHeldByCurrentThread(); + try (GridNearTxLocal tx = CU.txStartInternal(ctx, seqView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicSequenceValue seq = seqView.get(key); @@ -359,46 +385,39 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc long newUpBound; - lock.lock(); + curLocVal = locVal; - try { - curLocVal = locVal; + // If local range was already reserved in another thread. + if (curLocVal + l <= upBound) { + locVal = curLocVal + l; - // If local range was already reserved in another thread. - if (curLocVal + l <= upBound) { - locVal = curLocVal + l; - - return updated ? curLocVal + l : curLocVal; - } + return updated ? curLocVal + l : curLocVal; + } - long curGlobalVal = seq.get(); + long curGlobalVal = seq.get(); - long newLocVal; + long newLocVal; - /* We should use offset because we already reserved left side of range.*/ - long off = batchSize > 1 ? batchSize - 1 : 1; + /* We should use offset because we already reserved left side of range.*/ + long off = batchSize > 1 ? batchSize - 1 : 1; - // Calculate new values for local counter, global counter and upper bound. - if (curLocVal + l >= curGlobalVal) { - newLocVal = curLocVal + l; + // Calculate new values for local counter, global counter and upper bound. + if (curLocVal + l >= curGlobalVal) { + newLocVal = curLocVal + l; - newUpBound = newLocVal + off; - } - else { - newLocVal = curGlobalVal; + newUpBound = newLocVal + off; + } + else { + newLocVal = curGlobalVal; - newUpBound = newLocVal + off; - } + newUpBound = newLocVal + off; + } - locVal = newLocVal; - upBound = newUpBound; + locVal = newLocVal; + upBound = newUpBound; - if (updated) - curLocVal = newLocVal; - } - finally { - lock.unlock(); - } + if (updated) + curLocVal = newLocVal; // Global counter must be more than reserved upper bound. seq.set(newUpBound + 1); @@ -419,13 +438,13 @@ public final class GridCacheAtomicSequenceImpl implements GridCacheAtomicSequenc } /** {@inheritDoc} */ - @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { - this.seqView = kctx.cache().atomicsCache(); - this.ctx = seqView.context(); + @Override public void onActivate(GridKernalContext kctx) { + seqView = kctx.cache().atomicsCache(); + ctx = seqView.context(); } /** {@inheritDoc} */ - @Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException { + @Override public void onDeActivate(GridKernalContext kctx) { }