Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B6C73200C78 for ; Wed, 12 Apr 2017 09:33:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B57A6160B8A; Wed, 12 Apr 2017 07:33:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 73308160BB4 for ; Wed, 12 Apr 2017 09:33:38 +0200 (CEST) Received: (qmail 77220 invoked by uid 500); 12 Apr 2017 07:33:37 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 75814 invoked by uid 99); 12 Apr 2017 07:33:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 12 Apr 2017 07:33:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AEB7DF16B2; Wed, 12 Apr 2017 07:33:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.apache.org Date: Wed, 12 Apr 2017 07:33:52 -0000 Message-Id: In-Reply-To: <2769ac15f51146b08f1b0cfbfcde8d6c@git.apache.org> References: <2769ac15f51146b08f1b0cfbfcde8d6c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [21/57] [abbrv] ignite git commit: ignite-4571 - reviewed contribution archived-at: Wed, 12 Apr 2017 07:33:39 -0000 ignite-4571 - reviewed contribution Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b99c1980 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b99c1980 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b99c1980 Branch: refs/heads/ignite-3477-debug Commit: b99c1980dfe250b3f24a94eb4b6cb948bb314ab5 Parents: 5c48260 Author: Konstantin Dudkov Authored: Mon Apr 10 18:29:33 2017 +0300 Committer: Yakov Zhdanov Committed: Mon Apr 10 18:29:33 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAtomicFuture.java | 2 +- .../processors/cache/GridCacheMvccManager.java | 84 ++++++++++++++++---- .../cache/GridDeferredAckMessageSender.java | 11 ++- .../GridDhtAtomicAbstractUpdateFuture.java | 4 +- .../GridNearAtomicAbstractUpdateFuture.java | 2 +- .../GridNearAtomicSingleUpdateFuture.java | 21 ++--- .../dht/atomic/GridNearAtomicUpdateFuture.java | 23 +++--- .../cache/transactions/IgniteTxManager.java | 2 +- 8 files changed, 102 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java index 8df229e..87ae29c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicFuture.java @@ -27,7 +27,7 @@ public interface GridCacheAtomicFuture extends GridCacheFuture { /** * @return Future ID. */ - public Long id(); + public long id(); /** * Gets future that will be completed when it is safe when update is finished on the given version of topology. http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index dff2c88..712d136 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -81,6 +81,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** Maxim number of removed locks. */ private static final int MAX_REMOVED_LOCKS = 10240; + /** Maxim number of atomic IDs for thread. Must be power of two! */ + protected static final int THREAD_RESERVE_SIZE = 0x4000; + /** */ private static final int MAX_NESTED_LSNR_CALLS = getInteger(IGNITE_MAX_NESTED_LISTENER_CALLS, 5); @@ -106,9 +109,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { @GridToStringExclude private final ConcurrentMap>> mvccFuts = newMap(); - /** */ - private final AtomicLong atomicFutId = new AtomicLong(U.currentTimeMillis()); - /** Pending atomic futures. */ private final ConcurrentHashMap8> atomicFuts = new ConcurrentHashMap8<>(); @@ -138,6 +138,16 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** */ private volatile boolean stopping; + /** Global atomic id counter. */ + protected final AtomicLong globalAtomicCnt = new AtomicLong(); + + /** Per thread atomic id counter. */ + private final ThreadLocal threadAtomicCnt = new ThreadLocal() { + @Override protected LongWrapper initialValue() { + return new LongWrapper(globalAtomicCnt.getAndAdd(THREAD_RESERVE_SIZE)); + } + }; + /** Lock callback. */ @GridToStringExclude private final GridCacheMvccCallback cb = new GridCacheMvccCallback() { @@ -256,9 +266,9 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { cacheFut.onNodeLeft(discoEvt.eventNode().id()); if (cacheFut.isCancelled() || cacheFut.isDone()) { - Long futId = cacheFut.id(); + long futId = cacheFut.id(); - if (futId != null) + if (futId > 0) atomicFuts.remove(futId, cacheFut); } } @@ -426,18 +436,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** - * @return ID for atomic cache update future. - */ - public long atomicFutureId() { - return atomicFutId.incrementAndGet(); - } - - /** * @param futId Future ID. * @param fut Future. * @return {@code False} if future was forcibly completed with error. */ - public boolean addAtomicFuture(Long futId, GridCacheAtomicFuture fut) { + public boolean addAtomicFuture(long futId, GridCacheAtomicFuture fut) { IgniteInternalFuture old = atomicFuts.put(futId, fut); assert old == null : "Old future is not null [futId=" + futId + ", fut=" + fut + ", old=" + old + ']'; @@ -472,7 +475,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @param futId Future ID. * @return Future. */ - @Nullable public IgniteInternalFuture atomicFuture(Long futId) { + @Nullable public IgniteInternalFuture atomicFuture(long futId) { return atomicFuts.get(futId); } @@ -480,7 +483,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @param futId Future ID. * @return Removed future. */ - @Nullable public IgniteInternalFuture removeAtomicFuture(Long futId) { + @Nullable public IgniteInternalFuture removeAtomicFuture(long futId) { return atomicFuts.remove(futId); } @@ -1174,6 +1177,20 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * @return Next future ID for atomic futures. + */ + public long nextAtomicId() { + LongWrapper cnt = threadAtomicCnt.get(); + + long res = cnt.getAndIncrement(); + + if ((cnt.get() & (THREAD_RESERVE_SIZE - 1)) == 0) + cnt.set(globalAtomicCnt.getAndAdd(THREAD_RESERVE_SIZE)); + + return res; + } + + /** * */ private class FinishLockFuture extends GridFutureAdapter { @@ -1382,4 +1399,41 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { return S.toString(DataStreamerFuture.class, this, super.toString()); } } + + /** Long wrapper. */ + private static class LongWrapper { + /** */ + private long val; + + /** + * @param val Value. + */ + public LongWrapper(long val) { + this.val = val + 1; + + if (this.val == 0) + this.val = 1; + } + + /** + * @param val Value to set. + */ + public void set(long val) { + this.val = val; + } + + /** + * @return Current value. + */ + public long get() { + return val; + } + + /** + * @return Current value (and stores incremented value). + */ + public long getAndIncrement() { + return val++; + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java index 5265ec9..89aa725 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridDeferredAckMessageSender.java @@ -21,7 +21,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; @@ -33,7 +32,7 @@ import org.jsr166.ConcurrentLinkedDeque8; /** * */ -public abstract class GridDeferredAckMessageSender { +public abstract class GridDeferredAckMessageSender { /** Deferred message buffers. */ private ConcurrentMap deferredAckMsgBuffers = new ConcurrentHashMap8<>(); @@ -67,7 +66,7 @@ public abstract class GridDeferredAckMessageSender { * @param nodeId Node ID. * @param vers Versions to send. */ - public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8 vers); + public abstract void finish(UUID nodeId, ConcurrentLinkedDeque8 vers); /** * @@ -81,7 +80,7 @@ public abstract class GridDeferredAckMessageSender { * @param nodeId Node ID to send message to. * @param ver Version to ack. */ - public void sendDeferredAckMessage(UUID nodeId, GridCacheVersion ver) { + public void sendDeferredAckMessage(UUID nodeId, T ver) { while (true) { DeferredAckMessageBuffer buf = deferredAckMsgBuffers.get(nodeId); @@ -117,7 +116,7 @@ public abstract class GridDeferredAckMessageSender { private AtomicBoolean guard = new AtomicBoolean(false); /** Versions. */ - private ConcurrentLinkedDeque8 vers = new ConcurrentLinkedDeque8<>(); + private ConcurrentLinkedDeque8 vers = new ConcurrentLinkedDeque8<>(); /** Node ID. */ private final UUID nodeId; @@ -173,7 +172,7 @@ public abstract class GridDeferredAckMessageSender { * @param ver Version to send. * @return {@code True} if request was handled, {@code false} if this buffer is filled and cannot be used. */ - public boolean add(GridCacheVersion ver) { + public boolean add(T ver) { readLock().lock(); boolean snd = false; http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java index 5ff5aa4..0940acb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java @@ -110,7 +110,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte this.updateReq = updateReq; this.writeVer = writeVer; - futId = cctx.mvcc().atomicFutureId(); + futId = cctx.mvcc().nextAtomicId(); if (log == null) { msgLog = cctx.shared().atomicMessageLogger(); @@ -295,7 +295,7 @@ public abstract class GridDhtAtomicAbstractUpdateFuture extends GridFutureAdapte } /** {@inheritDoc} */ - @Override public final Long id() { + @Override public final long id() { return futId; } http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 39abb73..a2adb05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -138,7 +138,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt /** Future ID. */ @GridToStringInclude - protected Long futId; + protected long futId; /** Operation result. */ protected GridCacheReturn opRes; http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index c2372d1..e4ba457 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -125,7 +125,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda } /** {@inheritDoc} */ - @Override public Long id() { + @Override public long id() { synchronized (mux) { return futId; } @@ -216,7 +216,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda AffinityTopologyVersion remapTopVer0; synchronized (mux) { - if (futId == null || futId != res.futureId()) + if (futId == 0 || futId != res.futureId()) return; assert reqState != null; @@ -258,7 +258,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda CachePartialUpdateCheckedException err0 = null; synchronized (mux) { - if (futId == null || futId != res.futureId()) + if (futId == 0 || futId != res.futureId()) return; req = reqState.processPrimaryResponse(nodeId, res); @@ -331,7 +331,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda * @return Non-null topology version if update should be remapped. */ private AffinityTopologyVersion onAllReceived() { - assert futId != null; + assert Thread.holdsLock(mux); + assert futId > 0; AffinityTopologyVersion remapTopVer0 = null; @@ -362,7 +363,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda cctx.mvcc().removeAtomicFuture(futId); reqState = null; - futId = null; + futId = 0; topVer = AffinityTopologyVersion.ZERO; remapTopVer = null; @@ -479,7 +480,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda /** {@inheritDoc} */ @Override protected void map(AffinityTopologyVersion topVer) { - long futId = cctx.mvcc().atomicFutureId(); + long futId = cctx.mvcc().nextAtomicId(); Exception err = null; PrimaryRequestState reqState0 = null; @@ -488,7 +489,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda reqState0 = mapSingleUpdate(topVer, futId); synchronized (mux) { - assert this.futId == null : this; + assert this.futId == 0 : this; assert this.topVer == AffinityTopologyVersion.ZERO : this; this.topVer = topVer; @@ -529,7 +530,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda /** * @param futId Future ID. */ - private void checkDhtNodes(Long futId) { + private void checkDhtNodes(long futId) { GridCacheReturn opRes0 = null; CachePartialUpdateCheckedException err0 = null; AffinityTopologyVersion remapTopVer0 = null; @@ -537,7 +538,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda GridNearAtomicCheckUpdateRequest checkReq = null; synchronized (mux) { - if (this.futId == null || !this.futId.equals(futId)) + if (this.futId == 0 || this.futId != futId) return; assert reqState != null; @@ -570,7 +571,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda synchronized (mux) { id0 = futId; - futId = null; + futId = 0; } return id0; http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index a44ccf9..84deefc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -150,7 +150,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** {@inheritDoc} */ - @Override public Long id() { + @Override public long id() { synchronized (mux) { return futId; } @@ -167,7 +167,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu List checkReqs = null; synchronized (mux) { - if (futId == null) + if (futId == 0) return false; if (singleReq != null) { @@ -300,7 +300,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu AffinityTopologyVersion remapTopVer0; synchronized (mux) { - if (futId == null || futId != res.futureId()) + if (futId == 0 || futId != res.futureId()) return; PrimaryRequestState reqState; @@ -373,7 +373,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean rcvAll; synchronized (mux) { - if (futId == null || futId != res.futureId()) + if (futId == 0 || futId != res.futureId()) return; if (singleReq != null) { @@ -534,7 +534,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu * @return Non null topology version if update should be remapped. */ @Nullable private AffinityTopologyVersion onAllReceived() { - assert futId != null; + assert Thread.holdsLock(mux); + assert futId > 0; AffinityTopologyVersion remapTopVer0 = null; @@ -577,7 +578,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu if (remapTopVer0 != null) { cctx.mvcc().removeAtomicFuture(futId); - futId = null; + futId = 0; topVer = AffinityTopologyVersion.ZERO; remapTopVer = null; @@ -767,7 +768,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return; } - Long futId = cctx.mvcc().atomicFutureId(); + long futId = cctx.mvcc().nextAtomicId(); Exception err = null; PrimaryRequestState singleReq0 = null; @@ -801,7 +802,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } synchronized (mux) { - assert this.futId == null : this; + assert this.futId == 0 : this; assert this.topVer == AffinityTopologyVersion.ZERO : this; this.topVer = topVer; @@ -856,7 +857,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu checkDhtNodes(futId); } - private void checkDhtNodes(Long futId) { + private void checkDhtNodes(long futId) { GridCacheReturn opRes0 = null; CachePartialUpdateCheckedException err0 = null; AffinityTopologyVersion remapTopVer0 = null; @@ -866,7 +867,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu boolean rcvAll = false; synchronized (mux) { - if (this.futId == null || !this.futId.equals(futId)) + if (this.futId == 0 || this.futId != futId) return; if (singleReq != null) { @@ -940,7 +941,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu synchronized (mux) { id0 = futId; - futId = null; + futId = 0; } return id0; http://git-wip-us.apache.org/repos/asf/ignite/blob/b99c1980/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index d1334ef..6b383a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -211,7 +211,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { txHnd = new IgniteTxHandler(cctx); - deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) { + deferredAckMsgSnd = new GridDeferredAckMessageSender(cctx.time(), cctx.kernalContext().closure()) { @Override public int getTimeout() { return DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT; }