Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D357218AD5 for ; Mon, 9 Nov 2015 09:49:48 +0000 (UTC) Received: (qmail 27846 invoked by uid 500); 9 Nov 2015 09:49:48 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 27663 invoked by uid 500); 9 Nov 2015 09:49:48 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 26718 invoked by uid 99); 9 Nov 2015 09:49:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 09:49:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 90DD9DFF95; Mon, 9 Nov 2015 09:49:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Mon, 09 Nov 2015 09:50:30 -0000 Message-Id: In-Reply-To: <1c1f1d05ec054e519b8494b886c23fcb@git.apache.org> References: <1c1f1d05ec054e519b8494b886c23fcb@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [47/49] ignite git commit: Performance optimizations. http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 1c82636..eb2ca2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -24,16 +24,14 @@ import java.io.ObjectOutput; import java.io.ObjectStreamException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import javax.cache.expiry.ExpiryPolicy; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; @@ -47,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -59,7 +56,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; -import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter; import org.apache.ignite.internal.util.lang.GridTuple; @@ -199,20 +195,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter protected boolean transform; /** Commit version. */ - private AtomicReference commitVer = new AtomicReference<>(null); - - /** Done marker. */ - protected final AtomicBoolean isDone = new AtomicBoolean(false); + private volatile GridCacheVersion commitVer; /** */ private AtomicReference finalizing = new AtomicReference<>(FinalizationStatus.NONE); - /** Preparing flag. */ - private AtomicBoolean preparing = new AtomicBoolean(); + /** Done marker. */ + protected volatile boolean isDone; + + /** Preparing flag (no need for volatile modifier). */ + private boolean preparing; /** */ @GridToStringInclude - private Map> invalidParts = new HashMap<>(3); + private Map> invalidParts; /** * Transaction state. Note that state is not protected, as we want to @@ -230,17 +226,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** */ @GridToStringExclude - private AtomicReference> finFut = new AtomicReference<>(); + private volatile GridFutureAdapter finFut; /** Topology version. */ @GridToStringInclude - protected AtomicReference topVer = new AtomicReference<>(AffinityTopologyVersion.NONE); - - /** Mutex. */ - private final Lock lock = new ReentrantLock(); - - /** Lock condition. */ - private final Condition cond = lock.newCondition(); + protected volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE; /** */ protected Map> txNodes; @@ -387,37 +377,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** - * Acquires lock. - */ - @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) - protected final void lock() { - lock.lock(); - } - - /** - * Releases lock. - */ - protected final void unlock() { - lock.unlock(); - } - - /** - * Signals all waiters. - */ - protected final void signalAll() { - cond.signalAll(); - } - - /** - * Waits for signal. - * - * @throws InterruptedException If interrupted. - */ - protected final void awaitSignal() throws InterruptedException { - cond.await(); - } - - /** * Checks whether near cache should be updated. * * @return Flag indicating whether near cache should be updated. @@ -548,7 +507,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersion() { - AffinityTopologyVersion res = topVer.get(); + AffinityTopologyVersion res = topVer; if (res.equals(AffinityTopologyVersion.NONE)) return cctx.exchange().topologyVersion(); @@ -558,16 +517,29 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersionSnapshot() { - AffinityTopologyVersion ret = topVer.get(); + AffinityTopologyVersion ret = topVer; return AffinityTopologyVersion.NONE.equals(ret) ? null : ret; } /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersion(AffinityTopologyVersion topVer) { - this.topVer.compareAndSet(AffinityTopologyVersion.NONE, topVer); + AffinityTopologyVersion topVer0 = this.topVer; - return this.topVer.get(); + if (!AffinityTopologyVersion.NONE.equals(topVer0)) + return topVer0; + + synchronized (this) { + topVer0 = this.topVer; + + if (AffinityTopologyVersion.NONE.equals(topVer0)) { + this.topVer = topVer; + + return topVer; + } + + return topVer0; + } } /** {@inheritDoc} */ @@ -582,7 +554,14 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public boolean markPreparing() { - return preparing.compareAndSet(false, true); + synchronized (this) { + if (preparing) + return false; + + preparing = true; + + return true; + } } /** @@ -730,15 +709,18 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public Map> invalidPartitions() { - return invalidParts; + return invalidParts == null ? Collections.>emptyMap() : invalidParts; } /** {@inheritDoc} */ @Override public void addInvalidPartition(GridCacheContext cacheCtx, int part) { + if (invalidParts == null) + invalidParts = new HashMap<>(); + Set parts = invalidParts.get(cacheCtx.cacheId()); if (parts == null) { - parts = new GridLeanSet<>(); + parts = new HashSet<>(); invalidParts.put(cacheCtx.cacheId(), parts); } @@ -879,32 +861,71 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public boolean done() { - return isDone.get(); + return isDone; } /** - * @return Commit version. + * @return {@code True} if done flag has been set by this call. */ - @Override public GridCacheVersion commitVersion() { - initCommitVersion(); + private boolean setDone() { + boolean isDone0 = isDone; + + if (isDone0) + return false; + + synchronized (this) { + isDone0 = isDone; - return commitVer.get(); + if (isDone0) + return false; + + isDone = true; + + return true; + } } /** - * @param commitVer Commit version. - * @return {@code True} if set to not null value. + * @return Commit version. */ - @Override public boolean commitVersion(GridCacheVersion commitVer) { - return commitVer != null && this.commitVer.compareAndSet(null, commitVer); + @Override public GridCacheVersion commitVersion() { + GridCacheVersion commitVer0 = commitVer; + + if (commitVer0 != null) + return commitVer0; + + synchronized (this) { + commitVer0 = commitVer; + + if (commitVer0 != null) + return commitVer0; + + commitVer = commitVer0 = xidVer; + + return commitVer0; + } } /** - * + * @param commitVer Commit version. */ - public void initCommitVersion() { - if (commitVer.get() == null) - commitVer.compareAndSet(null, xidVer); + @Override public void commitVersion(GridCacheVersion commitVer) { + if (commitVer == null) + return; + + GridCacheVersion commitVer0 = this.commitVer; + + if (commitVer0 != null) + return; + + synchronized (this) { + commitVer0 = this.commitVer; + + if (commitVer0 != null) + return; + + this.commitVer = commitVer; + } } /** @@ -916,7 +937,19 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) rollback(); - awaitCompletion(); + synchronized (this) { + try { + while (!done()) + wait(); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + if (!done()) + throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + + this, e); + } + } } /** {@inheritDoc} */ @@ -930,29 +963,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /* No-op. */ } - /** - * Awaits transaction completion. - * - * @throws IgniteCheckedException If waiting failed. - */ - protected void awaitCompletion() throws IgniteCheckedException { - lock(); - - try { - while (!done()) - awaitSignal(); - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - - if (!done()) - throw new IgniteCheckedException("Got interrupted while waiting for transaction to complete: " + this, e); - } - finally { - unlock(); - } - } - /** {@inheritDoc} */ @Override public boolean internal() { return internal; @@ -1019,22 +1029,27 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @SuppressWarnings("ExternalizableWithoutPublicNoArgConstructor") @Override public IgniteInternalFuture finishFuture() { - GridFutureAdapter fut = finFut.get(); + GridFutureAdapter fut = finFut; if (fut == null) { - fut = new GridFutureAdapter() { - @Override public String toString() { - return S.toString(GridFutureAdapter.class, this, "tx", IgniteTxAdapter.this); - } - }; + synchronized (this) { + fut = finFut; - if (!finFut.compareAndSet(null, fut)) - fut = finFut.get(); + if (fut == null) { + fut = new GridFutureAdapter() { + @Override public String toString() { + return S.toString(GridFutureAdapter.class, this, "tx", IgniteTxAdapter.this); + } + }; + + finFut = fut; + } + } } assert fut != null; - if (isDone.get()) + if (isDone) fut.onDone(this); return fut; @@ -1059,9 +1074,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter boolean notify = false; - lock(); - - try { + synchronized (this) { prev = this.state; switch (state) { @@ -1087,7 +1100,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } case UNKNOWN: { - if (isDone.compareAndSet(false, true)) + if (setDone()) notify = true; valid = prev == ROLLING_BACK || prev == COMMITTING; @@ -1096,7 +1109,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } case COMMITTED: { - if (isDone.compareAndSet(false, true)) + if (setDone()) notify = true; valid = prev == COMMITTING; @@ -1105,7 +1118,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } case ROLLED_BACK: { - if (isDone.compareAndSet(false, true)) + if (setDone()) notify = true; valid = prev == ROLLING_BACK; @@ -1135,8 +1148,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter if (log.isDebugEnabled()) log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']'); - // Notify of state change. - signalAll(); + notifyAll(); } else { if (log.isDebugEnabled()) @@ -1144,12 +1156,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter ", tx=" + this + ']'); } } - finally { - unlock(); - } if (notify) { - GridFutureAdapter fut = finFut.get(); + GridFutureAdapter fut = finFut; if (fut != null) fut.onDone(this); @@ -2026,8 +2035,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ - @Override public boolean commitVersion(GridCacheVersion commitVer) { - return false; + @Override public void commitVersion(GridCacheVersion commitVer) { + // No-op. } /** {@inheritDoc} */ @@ -2037,7 +2046,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public void prepare() throws IgniteCheckedException { - + // No-op. } /** {@inheritDoc} */ @@ -2047,7 +2056,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public void endVersion(GridCacheVersion endVer) { - + // No-op. } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index d9786a8..570aa48 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -876,7 +876,7 @@ public class IgniteTxHandler { log.debug("Processing dht tx finish request [nodeId=" + nodeId + ", req=" + req + ']'); if (req.checkCommitted()) { - sendReply(nodeId, req, checkDhtRemoteTxCommitted(req.version())); + sendReply(nodeId, req, !ctx.tm().addRolledbackTx(null, req.version())); return; } @@ -896,8 +896,11 @@ public class IgniteTxHandler { if (req.replyRequired()) { IgniteInternalFuture completeFut; - IgniteInternalFuture dhtFin = dhtTx == null ? null : dhtTx.done() ? null : dhtTx.finishFuture(); - IgniteInternalFuture nearFin = nearTx == null ? null : nearTx.done() ? null : nearTx.finishFuture(); + IgniteInternalFuture dhtFin = dhtTx == null ? + null : dhtTx.done() ? null : dhtTx.finishFuture(); + + IgniteInternalFuture nearFin = nearTx == null ? + null : nearTx.done() ? null : nearTx.finishFuture(); if (dhtFin != null && nearFin != null) { GridCompoundFuture fut = new GridCompoundFuture(); @@ -914,8 +917,7 @@ public class IgniteTxHandler { if (completeFut != null) { completeFut.listen(new CI1>() { - @Override - public void apply(IgniteInternalFuture igniteTxIgniteFuture) { + @Override public void apply(IgniteInternalFuture igniteTxIgniteFuture) { sendReply(nodeId, req, true); } }); @@ -928,24 +930,6 @@ public class IgniteTxHandler { } /** - * Checks whether DHT remote transaction with given version has been committed. If not, will add version - * to rollback version set so that late response will not falsely commit this transaction. - * - * @param writeVer Write version to check. - * @return {@code True} if transaction has been committed, {@code false} otherwise. - */ - public boolean checkDhtRemoteTxCommitted(GridCacheVersion writeVer) { - assert writeVer != null; - - boolean committed = true; - - if (ctx.tm().addRolledbackTx(writeVer)) - committed = false; - - return committed; - } - - /** * @param nodeId Node ID. * @param tx Transaction. * @param req Request. @@ -953,7 +937,8 @@ public class IgniteTxHandler { protected void finish( UUID nodeId, IgniteTxRemoteEx tx, - GridDhtTxFinishRequest req) { + GridDhtTxFinishRequest req + ) { // We don't allow explicit locks for transactions and // therefore immediately return if transaction is null. // However, we may decide to relax this restriction in @@ -961,9 +946,9 @@ public class IgniteTxHandler { if (tx == null) { if (req.commit()) // Must be some long time duplicate, but we add it anyway. - ctx.tm().addCommittedTx(req.version(), null); + ctx.tm().addCommittedTx(tx, req.version(), null); else - ctx.tm().addRolledbackTx(req.version()); + ctx.tm().addRolledbackTx(tx, req.version()); if (log.isDebugEnabled()) log.debug("Received finish request for non-existing transaction (added to completed set) " + http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 82e5f2a..2c7bf8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -206,21 +206,21 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter int taskNameHash ) { super( - cctx, - xidVer, - implicit, - implicitSingle, - /*local*/true, - sys, + cctx, + xidVer, + implicit, + implicitSingle, + /*local*/true, + sys, plc, - concurrency, - isolation, + concurrency, + isolation, timeout, invalidate, - storeEnabled, - onePhaseCommit, - txSize, - subjId, + storeEnabled, + onePhaseCommit, + txSize, + subjId, taskNameHash ); @@ -1054,7 +1054,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter eventNodeId(), txEntry.nodeId(), false, - false, evt, metrics, topVer, @@ -1072,7 +1071,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter nodeId, false, false, - false, metrics, topVer, CU.empty0(), http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 1f51b8a..c2e7dea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -18,17 +18,16 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.io.Externalizable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.LinkedList; import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; @@ -62,6 +61,7 @@ import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; @@ -75,6 +75,7 @@ import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; +import org.jsr166.ConcurrentLinkedHashMap; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT; @@ -95,6 +96,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARED; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.UNKNOWN; +import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; /** * Cache transaction manager. @@ -128,8 +130,18 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private IgniteTxHandler txHandler; /** Committed local transactions. */ - private final GridBoundedConcurrentOrderedMap completedVers = - new GridBoundedConcurrentOrderedMap<>(Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); + private final GridBoundedConcurrentOrderedMap completedVersSorted = + new GridBoundedConcurrentOrderedMap<>( + Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT)); + + /** Committed local transactions. */ + private final ConcurrentLinkedHashMap completedVersHashMap = + new ConcurrentLinkedHashMap<>( + Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT), + 0.75f, + Runtime.getRuntime().availableProcessors() * 2, + Integer.getInteger(IGNITE_MAX_COMPLETED_TX_COUNT, DFLT_MAX_COMPLETED_TX_CNT), + PER_SEGMENT_Q); /** Transaction finish synchronizer. */ private GridCacheTxFinishSync txFinishSync; @@ -298,7 +310,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { X.println(">>> Transaction manager memory stats [grid=" + cctx.gridName() + ']'); X.println(">>> threadMapSize: " + threadMap.size()); X.println(">>> idMap [size=" + idMap.size() + ']'); - X.println(">>> completedVersSize: " + completedVers.size()); + X.println(">>> completedVersSortedSize: " + completedVersSorted.size()); + X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex()); } /** @@ -319,7 +332,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Committed versions size. */ public int completedVersionsSize() { - return completedVers.size(); + return completedVersHashMap.size(); } /** @@ -329,7 +342,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * {@code false} otherwise. */ public boolean isCompleted(IgniteInternalTx tx) { - return completedVers.containsKey(tx.xidVersion()); + return completedVersHashMap.containsKey(tx.xidVersion()); } /** @@ -770,65 +783,59 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * @param map Collection to copy. - * @param expVal Values to copy. - * @return Copy of the collection. + * @param min Minimum version. + * @return Pair [committed, rolledback] - never {@code null}, elements potentially empty, + * but also never {@code null}. */ - private Collection copyOf(Map map, boolean expVal) { - Collection l = new LinkedList<>(); + public IgnitePair> versions(GridCacheVersion min) { + Collection committed = null; + Collection rolledback = null; - for (Map.Entry e : map.entrySet()) { - if (e.getValue() == expVal) - l.add(e.getKey()); - } + for (Map.Entry e : completedVersSorted.tailMap(min, true).entrySet()) { + if (e.getValue()) { + if (committed == null) + committed = new ArrayList<>(); - return l; - } + committed.add(e.getKey()); + } + else { + if (rolledback == null) + rolledback = new ArrayList<>(); - /** - * Gets committed transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive? - * - * @param min Start (or minimum) version. - * @return Committed transactions starting from the given version (non-inclusive). - */ - public Collection committedVersions(GridCacheVersion min) { - ConcurrentNavigableMap tail - = completedVers.tailMap(min, true); + rolledback.add(e.getKey()); + } + } - return F.isEmpty(tail) ? Collections.emptyList() : copyOf(tail, true); + return F.pair( + committed == null ? Collections.emptyList() : committed, + rolledback == null ? Collections.emptyList() : rolledback); } /** - * Gets rolledback transactions starting from the given version (inclusive). // TODO: GG-4011: why inclusive? - * - * @param min Start (or minimum) version. - * @return Committed transactions starting from the given version (non-inclusive). + * @return Collection of active transactions. */ - public Collection rolledbackVersions(GridCacheVersion min) { - ConcurrentNavigableMap tail - = completedVers.tailMap(min, true); - - return F.isEmpty(tail) ? Collections.emptyList() : copyOf(tail, false); + public Collection activeTransactions() { + return F.concat(false, idMap.values(), nearIdMap.values()); } /** * @param tx Tx to remove. */ public void removeCommittedTx(IgniteInternalTx tx) { - completedVers.remove(tx.xidVersion(), true); + completedVersHashMap.remove(tx.xidVersion(), true); + + if (tx.needsCompletedVersions()) + completedVersSorted.remove(tx.xidVersion(), true); } /** * @param tx Committed transaction. - * @return If transaction was not already present in committed set. */ - public boolean addCommittedTx(IgniteInternalTx tx) { - boolean res = addCommittedTx(tx.xidVersion(), tx.nearXidVersion()); + public void addCommittedTx(IgniteInternalTx tx) { + addCommittedTx(tx, tx.xidVersion(), tx.nearXidVersion()); if (!tx.local() && !tx.near() && tx.onePhaseCommit()) - addCommittedTx(tx.nearXidVersion(), null); - - return res; + addCommittedTx(tx, tx.nearXidVersion(), null); } /** @@ -836,60 +843,52 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return If transaction was not already present in committed set. */ public boolean addRolledbackTx(IgniteInternalTx tx) { - return addRolledbackTx(tx.xidVersion()); - } - - /** - * @return Collection of active transactions. - */ - public Collection activeTransactions() { - return F.concat(false, idMap.values(), nearIdMap.values()); + return addRolledbackTx(tx, tx.xidVersion()); } /** + * @param tx Tx. * @param xidVer Completed transaction version. * @param nearXidVer Optional near transaction ID. * @return If transaction was not already present in completed set. */ - public boolean addCommittedTx(GridCacheVersion xidVer, @Nullable GridCacheVersion nearXidVer) { + public boolean addCommittedTx( + IgniteInternalTx tx, + GridCacheVersion xidVer, + @Nullable GridCacheVersion nearXidVer + ) { if (nearXidVer != null) xidVer = new CommittedVersion(xidVer, nearXidVer); - Boolean committed = completedVers.putIfAbsent(xidVer, true); + Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, true); - if (committed == null || committed) { - if (log.isDebugEnabled()) - log.debug("Added transaction to committed version set: " + xidVer); + if (committed0 == null && (tx == null || tx.needsCompletedVersions())) { + Boolean b = completedVersSorted.putIfAbsent(xidVer, true); - return true; + assert b == null; } - else { - if (log.isDebugEnabled()) - log.debug("Transaction is already present in rolled back version set: " + xidVer); - return false; - } + return committed0 == null || committed0; } /** + * @param tx Tx. * @param xidVer Completed transaction version. * @return If transaction was not already present in completed set. */ - public boolean addRolledbackTx(GridCacheVersion xidVer) { - Boolean committed = completedVers.putIfAbsent(xidVer, false); + public boolean addRolledbackTx( + IgniteInternalTx tx, + GridCacheVersion xidVer + ) { + Boolean committed0 = completedVersHashMap.putIfAbsent(xidVer, false); - if (committed == null || !committed) { - if (log.isDebugEnabled()) - log.debug("Added transaction to rolled back version set: " + xidVer); + if (committed0 == null && (tx == null || tx.needsCompletedVersions())) { + Boolean b = completedVersSorted.putIfAbsent(xidVer, false); - return true; + assert b == null; } - else { - if (log.isDebugEnabled()) - log.debug("Transaction is already present in committed version set: " + xidVer); - return false; - } + return committed0 == null || !committed0; } /** @@ -903,7 +902,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert min != null; - tx.completedVersions(min, committedVersions(min), rolledbackVersions(min)); + IgnitePair> versPair = versions(min); + + tx.completedVersions(min, versPair.get1(), versPair.get2()); } } @@ -1027,18 +1028,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * so we don't do it here. */ - Boolean committed = completedVers.get(tx.xidVersion()); + Boolean committed = completedVersHashMap.get(tx.xidVersion()); // 1. Make sure that committed version has been recorded. if (!((committed != null && committed) || tx.writeSet().isEmpty() || tx.isSystemInvalidate())) { uncommitTx(tx); - GridCacheVersion first = completedVers.isEmpty() ? null : completedVers.firstKey(); - GridCacheVersion last = completedVers.isEmpty() ? null : completedVers.lastKey(); - throw new IgniteException("Missing commit version (consider increasing " + - IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + ", firstVer=" + - first + ", lastVer=" + last + ", tx=" + tx.xid() + ']'); + IGNITE_MAX_COMPLETED_TX_COUNT + " system property) [ver=" + tx.xidVersion() + + ", tx=" + tx.getClass().getSimpleName() + ']'); } ConcurrentMap txIdMap = transactionMap(tx); @@ -1578,12 +1576,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return resFut; } - Boolean committed = null; - - for (Map.Entry entry : completedVers.entrySet()) { - if (entry.getValue() == null) - continue; + boolean committed = false; + for (Map.Entry entry : completedVersHashMap.entrySet()) { if (entry.getKey() instanceof CommittedVersion) { CommittedVersion comm = (CommittedVersion)entry.getKey(); @@ -1598,7 +1593,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (log.isDebugEnabled()) log.debug("Near transaction committed: " + committed); - resFut.onDone(committed != null && committed); + resFut.onDone(committed); return resFut; } @@ -1702,7 +1697,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // Not all transactions were found. Need to scan committed versions to check // if transaction was already committed. - for (Map.Entry e : completedVers.entrySet()) { + for (Map.Entry e : completedVersHashMap.entrySet()) { if (!e.getValue()) continue; http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java index 04d1a85..7aa3734 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashMap.java @@ -116,9 +116,6 @@ public class GridBoundedConcurrentLinkedHashMap extends ConcurrentLinkedHa /** {@inheritDoc} */ @Override public String toString() { - // TODO GG-4788 - return policy() != SINGLE_Q ? - S.toString(GridBoundedConcurrentLinkedHashMap.class, this) : - S.toString(GridBoundedConcurrentLinkedHashMap.class, this, "entrySet", keySet()); + return S.toString(GridBoundedConcurrentLinkedHashMap.class, this, "entrySet", keySet()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java index a06f2ff..2801839 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentLinkedHashSet.java @@ -156,9 +156,6 @@ public class GridBoundedConcurrentLinkedHashSet extends GridSetWrapper { /** {@inheritDoc} */ @Override public String toString() { - // TODO GG-4788 - return ((ConcurrentLinkedHashMap)map()).policy() != SINGLE_Q ? - S.toString(GridBoundedConcurrentLinkedHashSet.class, this) : - S.toString(GridBoundedConcurrentLinkedHashSet.class, this, "elements", map().keySet()); + return S.toString(GridBoundedConcurrentLinkedHashSet.class, this, "elements", map().keySet()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java index b091652..3f6db30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridBoundedConcurrentOrderedMap.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.util; import java.util.Comparator; import java.util.Map; -import java.util.NoSuchElementException; import java.util.SortedMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -46,7 +45,7 @@ public class GridBoundedConcurrentOrderedMap extends ConcurrentSkipListMap private static final long serialVersionUID = 0L; /** Element count. */ - private final AtomicInteger cnt = new AtomicInteger(0); + private final AtomicInteger cnt = new AtomicInteger(); /** Maximum size. */ private int max; @@ -168,35 +167,21 @@ public class GridBoundedConcurrentOrderedMap extends ConcurrentSkipListMap private void onPut() { cnt.incrementAndGet(); - int c; + IgniteBiInClosure lsnr = this.lsnr; - while ((c = cnt.get()) > max) { - // Decrement count. - if (cnt.compareAndSet(c, c - 1)) { - try { - K key = firstEntry().getKey(); + int delta = cnt.get() - max; - V val; + for (int i = 0; i < delta && cnt.get() > max; i++) { + Entry e = pollFirstEntry(); - // Make sure that an element is removed. - while ((val = super.remove(firstEntry().getKey())) == null) { - // No-op. - } + if (e == null) + return; - assert val != null; - - IgniteBiInClosure lsnr = this.lsnr; - - // Listener notification. - if (lsnr != null) - lsnr.apply(key, val); - } - catch (NoSuchElementException ignored) { - cnt.incrementAndGet(); + cnt.decrementAndGet(); - return; - } - } + // Listener notification. + if (lsnr != null) + lsnr.apply(e.getKey(), e.getValue()); } } @@ -251,4 +236,4 @@ public class GridBoundedConcurrentOrderedMap extends ConcurrentSkipListMap return rmvd; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java index 6e0e876..d1a7bb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentFactory.java @@ -18,18 +18,12 @@ package org.apache.ignite.internal.util; import java.util.concurrent.ConcurrentMap; -import org.apache.ignite.IgniteSystemProperties; import org.jsr166.ConcurrentHashMap8; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAP_CONCURRENCY_LEVEL; - /** * Concurrent map factory. */ public class GridConcurrentFactory { - /** Default concurrency level. */ - private static final int CONCURRENCY_LEVEL = IgniteSystemProperties.getInteger(IGNITE_MAP_CONCURRENCY_LEVEL, 256); - /** * Ensure singleton. */ @@ -43,7 +37,6 @@ public class GridConcurrentFactory { * @return New concurrent map. */ public static ConcurrentMap newMap() { - return new ConcurrentHashMap8<>(16 * CONCURRENCY_LEVEL, 0.75f, CONCURRENCY_LEVEL); + return new ConcurrentHashMap8<>(); } - -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java index 5a53b4b..0c76787 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridConcurrentLinkedHashSet.java @@ -24,8 +24,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedHashMap; -import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.SINGLE_Q; - /** * Concurrent linked set implementation. */ @@ -123,9 +121,6 @@ public class GridConcurrentLinkedHashSet extends GridSetWrapper { /** {@inheritDoc} */ @Override public String toString() { - // TODO GG-4788 - return ((ConcurrentLinkedHashMap)map()).policy() != SINGLE_Q ? - S.toString(GridConcurrentLinkedHashSet.class, this) : - S.toString(GridConcurrentLinkedHashSet.class, this, "elements", map().keySet()); + return S.toString(GridConcurrentLinkedHashSet.class, this, "elements", map().keySet()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java index 4ca00d9..d9ffdd2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUuidCache.java @@ -20,6 +20,8 @@ package org.apache.ignite.internal.util; import java.util.UUID; import java.util.concurrent.ConcurrentMap; +import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; + /** * */ @@ -29,7 +31,7 @@ public final class IgniteUuidCache { /** Cache. */ private static final ConcurrentMap cache = - new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64); + new GridBoundedConcurrentLinkedHashMap<>(MAX, 1024, 0.75f, 64, PER_SEGMENT_Q); /** * Gets cached UUID to preserve memory. @@ -56,4 +58,4 @@ public final class IgniteUuidCache { private IgniteUuidCache() { // No-op. } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 0a6d9aa..31674f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -17,11 +17,9 @@ package org.apache.ignite.internal.util.future; +import java.util.ArrayList; import java.util.Collection; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicMarkableReference; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; @@ -35,7 +33,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteReducer; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; /** * Future composed of multiple inner futures. @@ -44,33 +41,38 @@ public class GridCompoundFuture extends GridFutureAdapter { /** */ private static final long serialVersionUID = 0L; - /** Futures. */ - private final ConcurrentLinkedDeque8> futs = new ConcurrentLinkedDeque8<>(); + /** */ + private static final int INITED = 0b1; - /** Pending futures. */ - private final Collection> pending = new ConcurrentLinkedDeque8<>(); + /** */ + private static final AtomicIntegerFieldUpdater flagsUpd = + AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "flags"); - /** Listener call count. */ - private final AtomicInteger lsnrCalls = new AtomicInteger(); + /** */ + private static final AtomicIntegerFieldUpdater lsnrCallsUpd = + AtomicIntegerFieldUpdater.newUpdater(GridCompoundFuture.class, "lsnrCalls"); - /** Finished flag. */ - private final AtomicBoolean finished = new AtomicBoolean(); + /** Futures. */ + private final Collection> futs = new ArrayList<>(); /** Reducer. */ @GridToStringInclude private IgniteReducer rdc; - /** Initialize flag. */ - private AtomicBoolean init = new AtomicBoolean(false); - - /** Result with a flag to control if reducer has been called. */ - private AtomicMarkableReference res = new AtomicMarkableReference<>(null, false); - /** Exceptions to ignore. */ private Class[] ignoreChildFailures; - /** Error. */ - private AtomicReference err = new AtomicReference<>(); + /** + * Updated via {@link #flagsUpd}. + * + * @see #INITED + */ + @SuppressWarnings("unused") + private volatile int flags; + + /** Updated via {@link #lsnrCallsUpd}. */ + @SuppressWarnings("unused") + private volatile int lsnrCalls; /** * @@ -104,7 +106,7 @@ public class GridCompoundFuture extends GridFutureAdapter { /** {@inheritDoc} */ @Override public boolean cancel() throws IgniteCheckedException { if (onCancelled()) { - for (IgniteInternalFuture fut : futs) + for (IgniteInternalFuture fut : futures()) fut.cancel(); return true; @@ -118,8 +120,26 @@ public class GridCompoundFuture extends GridFutureAdapter { * * @return Collection of futures. */ + private Collection> futures(boolean pending) { + synchronized (futs) { + Collection> res = new ArrayList<>(futs.size()); + + for (IgniteInternalFuture fut : futs) { + if (!pending || !fut.isDone()) + res.add(fut); + } + + return res; + } + } + + /** + * Gets collection of futures. + * + * @return Collection of futures. + */ public Collection> futures() { - return futs; + return futures(false); } /** @@ -128,7 +148,7 @@ public class GridCompoundFuture extends GridFutureAdapter { * @return Pending futures. */ public Collection> pending() { - return pending; + return futures(true); } /** @@ -147,7 +167,7 @@ public class GridCompoundFuture extends GridFutureAdapter { * @return {@code True} if there are pending futures. */ public boolean hasPending() { - return !pending.isEmpty(); + return !pending().isEmpty(); } /** @@ -155,7 +175,7 @@ public class GridCompoundFuture extends GridFutureAdapter { * {@link #markInitialized()} method is called on future. */ public boolean initialized() { - return init.get(); + return flagSet(INITED); } /** @@ -166,18 +186,20 @@ public class GridCompoundFuture extends GridFutureAdapter { public void add(IgniteInternalFuture fut) { assert fut != null; - pending.add(fut); - futs.add(fut); + synchronized (futs) { + futs.add(fut); + } fut.listen(new Listener()); - if (isCancelled()) + if (isCancelled()) { try { fut.cancel(); } catch (IgniteCheckedException e) { onDone(e); } + } } /** @@ -185,7 +207,8 @@ public class GridCompoundFuture extends GridFutureAdapter { * * @param futs Futures to add. */ - public void addAll(@Nullable IgniteInternalFuture... futs) { + @SafeVarargs + public final void addAll(@Nullable IgniteInternalFuture... futs) { addAll(F.asList(futs)); } @@ -195,9 +218,10 @@ public class GridCompoundFuture extends GridFutureAdapter { * @param futs Futures to add. */ public void addAll(@Nullable Iterable> futs) { - if (futs != null) + if (futs != null) { for (IgniteInternalFuture fut : futs) add(fut); + } } /** @@ -219,10 +243,34 @@ public class GridCompoundFuture extends GridFutureAdapter { } /** + * @param flag Flag to CAS. + * @return {@code True} if CAS succeeds. + */ + private boolean casFlag(int flag) { + for (;;) { + int flags0 = flags; + + if ((flags0 & flag) != 0) + return false; + + if (flagsUpd.compareAndSet(this, flags0, flags0 | flag)) + return true; + } + } + + /** + * @param flag Flag to check. + * @return {@code True} if set. + */ + private boolean flagSet(int flag) { + return (flags & flag) != 0; + } + + /** * Mark this future as initialized. */ public void markInitialized() { - if (init.compareAndSet(false, true)) + if (casFlag(INITED)) // Check complete to make sure that we take care // of all the ignored callbacks. checkComplete(); @@ -232,22 +280,14 @@ public class GridCompoundFuture extends GridFutureAdapter { * Check completeness of the future. */ private void checkComplete() { - Throwable err = this.err.get(); - - boolean ignore = ignoreFailure(err); - - if (init.get() && (res.isMarked() || lsnrCalls.get() == futs.sizex() || (err != null && !ignore)) - && finished.compareAndSet(false, true)) { + if (flagSet(INITED) && !isDone() && lsnrCalls == futuresSize()) { try { - if (err == null && rdc != null && !res.isMarked()) - res.compareAndSet(null, rdc.reduce(), false, true); + onDone(rdc != null ? rdc.reduce() : null); } catch (RuntimeException e) { U.error(null, "Failed to execute compound future reducer: " + this, e); onDone(e); - - return; } catch (AssertionError e) { U.error(null, "Failed to execute compound future reducer: " + this, e); @@ -256,8 +296,15 @@ public class GridCompoundFuture extends GridFutureAdapter { throw e; } + } + } - onDone(res.getReference(), ignore ? null : err); + /** + * @return Futures size. + */ + private int futuresSize() { + synchronized (futs) { + return futs.size(); } } @@ -288,7 +335,7 @@ public class GridCompoundFuture extends GridFutureAdapter { "cancelled", isCancelled(), "err", error(), "futs", - F.viewReadOnly(futs, new C1, String>() { + F.viewReadOnly(futures(), new C1, String>() { @Override public String apply(IgniteInternalFuture f) { return Boolean.toString(f.isDone()); } @@ -305,14 +352,12 @@ public class GridCompoundFuture extends GridFutureAdapter { /** {@inheritDoc} */ @Override public void apply(IgniteInternalFuture fut) { - pending.remove(fut); - try { T t = fut.get(); try { - if (rdc != null && !rdc.collect(t) && !res.isMarked()) - res.compareAndSet(null, rdc.reduce(), false, true); + if (rdc != null && !rdc.collect(t)) + onDone(rdc.reduce()); } catch (RuntimeException e) { U.error(null, "Failed to execute compound future reducer: " + this, e); @@ -331,18 +376,20 @@ public class GridCompoundFuture extends GridFutureAdapter { } catch (IgniteTxOptimisticCheckedException | IgniteFutureCancelledCheckedException | ClusterTopologyCheckedException e) { - err.compareAndSet(null, e); + if (!ignoreFailure(e)) + onDone(e); } catch (IgniteCheckedException e) { - if (!ignoreFailure(e)) + if (!ignoreFailure(e)) { U.error(null, "Failed to execute compound future reducer: " + this, e); - err.compareAndSet(null, e); + onDone(e); + } } catch (RuntimeException e) { U.error(null, "Failed to execute compound future reducer: " + this, e); - err.compareAndSet(null, e); + onDone(e); } catch (AssertionError e) { U.error(null, "Failed to execute compound future reducer: " + this, e); @@ -353,7 +400,7 @@ public class GridCompoundFuture extends GridFutureAdapter { throw e; } - lsnrCalls.incrementAndGet(); + lsnrCallsUpd.incrementAndGet(GridCompoundFuture.this); checkComplete(); } @@ -363,4 +410,4 @@ public class GridCompoundFuture extends GridFutureAdapter { return "Compound future listener: " + GridCompoundFuture.this; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java index d93f12e..b3747d7 100644 --- a/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java +++ b/modules/core/src/main/java/org/jsr166/ConcurrentHashMap8.java @@ -3805,4 +3805,4 @@ public class ConcurrentHashMap8 } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java index 75db13c..28e38d7 100644 --- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java +++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedDeque8.java @@ -1735,4 +1735,4 @@ public class ConcurrentLinkedDeque8 } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java index 5b7381e..e8f8e0f 100644 --- a/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java +++ b/modules/core/src/main/java/org/jsr166/ConcurrentLinkedHashMap.java @@ -17,7 +17,6 @@ import java.util.AbstractSet; import java.util.ArrayDeque; import java.util.Collection; import java.util.ConcurrentModificationException; -import java.util.Deque; import java.util.Enumeration; import java.util.HashMap; import java.util.Hashtable; @@ -28,6 +27,9 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV; @@ -264,12 +266,14 @@ public class ConcurrentLinkedHashMap extends AbstractMap implements private volatile V val; /** Reference to a node in queue for fast removal operations. */ + @GridToStringExclude private volatile ConcurrentLinkedDeque8.Node node; /** Modification count of the map for duplicates exclusion. */ private volatile int modCnt; /** Link to the next entry in a bucket */ + @GridToStringExclude private final HashEntry next; /** @@ -332,6 +336,11 @@ public class ConcurrentLinkedHashMap extends AbstractMap implements static HashEntry[] newArray(int i) { return new HashEntry[i]; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HashEntry.class, this, "key", key, "val", val); + } } /** @@ -749,7 +758,7 @@ public class ConcurrentLinkedHashMap extends AbstractMap implements recordInsert(e, (ConcurrentLinkedDeque8)segEntryQ); if (maxCap > 0) - checkRemoveEldestEntrySegment(); + checkRemoveEldestEntrySegment(c); break; @@ -757,7 +766,7 @@ public class ConcurrentLinkedHashMap extends AbstractMap implements segEntryQ.add(e); if (maxCap > 0) - checkRemoveEldestEntrySegment(); + checkRemoveEldestEntrySegment(c); break; @@ -779,23 +788,21 @@ public class ConcurrentLinkedHashMap extends AbstractMap implements } /** - * + * @param cnt Segment entries count. */ - private void checkRemoveEldestEntrySegment() { + private void checkRemoveEldestEntrySegment(int cnt) { assert maxCap > 0; - int rmvCnt = sizex() - maxCap; - - for (int i = 0; i < rmvCnt; i++) { + if (cnt - ((maxCap / segments.length) + 1) > 0) { HashEntry e0 = segEntryQ.poll(); - if (e0 == null) - break; - - removeLocked(e0.key, e0.hash, null /*no need to compare*/, false); + assert e0 != null; - if (sizex() <= maxCap) - break; + removeLocked( + e0.key, + e0.hash, + null /*no need to compare*/, + false); } } @@ -1812,34 +1819,22 @@ public class ConcurrentLinkedHashMap extends AbstractMap implements * @param asc {@code True} for ascending iterator. */ HashIterator(boolean asc) { - // TODO GG-4788 - Need to fix iterators for ConcurrentLinkedHashMap in perSegment mode - if (qPlc != SINGLE_Q) - throw new IllegalStateException("Iterators are not supported in 'perSegmentQueue' modes."); - modCnt = ConcurrentLinkedHashMap.this.modCnt.intValue(); // Init delegate. - delegate = asc ? entryQ.iterator() : entryQ.descendingIterator(); - - advance(); - } + switch (qPlc) { + case SINGLE_Q: + delegate = asc ? entryQ.iterator() : entryQ.descendingIterator(); - /** - * @return Copy of the queue. - */ - private Deque> copyQueue() { - int i = entryQ.sizex(); - - Deque> res = new ArrayDeque<>(i); - - Iterator> iter = entryQ.iterator(); + break; - while (iter.hasNext() && i-- >= 0) - res.add(iter.next()); + default: + assert qPlc == PER_SEGMENT_Q || qPlc == PER_SEGMENT_Q_OPTIMIZED_RMV : qPlc; - assert !iter.hasNext() : "Entries queue has been modified."; + delegate = new HashIteratorDelegate(); + } - return res; + advance(); } /** @@ -1901,6 +1896,130 @@ public class ConcurrentLinkedHashMap extends AbstractMap implements } /** + * + */ + private class HashIteratorDelegate implements Iterator> { + /** */ + private HashEntry[] curTbl; + + /** */ + private int nextSegIdx; + + /** */ + private int nextTblIdx; + + /** */ + private HashEntry next; + + /** */ + private HashEntry next0; + + /** */ + private HashEntry cur; + + /** + * + */ + public HashIteratorDelegate() { + nextSegIdx = segments.length - 1; + nextTblIdx = -1; + + advance(); + } + + /** + * + */ + private void advance() { + if (next0 != null && advanceInBucket(next0, true)) + return; + + while (nextTblIdx >= 0) { + HashEntry bucket = curTbl[nextTblIdx--]; + + if (bucket != null && advanceInBucket(bucket, false)) + return; + } + + while (nextSegIdx >= 0) { + int nextSegIdx0 = nextSegIdx--; + + Segment seg = segments[nextSegIdx0]; + + curTbl = seg.tbl; + + for (int j = curTbl.length - 1; j >= 0; --j) { + HashEntry bucket = curTbl[j]; + + if (bucket != null && advanceInBucket(bucket, false)) { + nextTblIdx = j - 1; + + return; + } + } + } + } + + /** + * @param e Current next. + * @return {@code True} if advance succeeded. + */ + @SuppressWarnings( {"unchecked"}) + private boolean advanceInBucket(@Nullable HashEntry e, boolean skipFirst) { + if (e == null) + return false; + + next0 = e; + + do { + if (!skipFirst) { + next = next0; + + return true; + } + else + skipFirst = false; + } + while ((next0 = next0.next) != null); + + assert next0 == null; + + next = null; + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean hasNext() { + return next != null; + } + + /** {@inheritDoc} */ + @Override public HashEntry next() { + HashEntry e = next; + + if (e == null) + throw new NoSuchElementException(); + + advance(); + + return e; + } + + /** {@inheritDoc} */ + @Override public void remove() { + if (cur == null) + throw new IllegalStateException(); + + HashEntry e = cur; + + cur = null; + + ConcurrentLinkedHashMap.this.remove(e.key, e.val); + } + } + + /** * Key iterator implementation. */ private final class KeyIterator extends HashIterator implements Iterator, Enumeration { @@ -2154,13 +2273,17 @@ public class ConcurrentLinkedHashMap extends AbstractMap implements * the fastest "natural" evicts for bounded maps. *

* NOTE: Remove operations on map are slower than with other policies. + *

+ * NOTE: Iteration order is not preserved, i.e. iteration goes as if it was ordinary hash map. */ PER_SEGMENT_Q, /** * Instance of {@code GridConcurrentLinkedDequeue} is created for each segment. This gives * faster "natural" evicts for bounded queues and better remove operation times. + *

+ * NOTE: Iteration order is not preserved, i.e. iteration goes as if it was ordinary hash map. */ PER_SEGMENT_Q_OPTIMIZED_RMV } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java index 50ba241..616fd43 100644 --- a/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/GridCacheAffinityBackupsSelfTest.java @@ -26,12 +26,18 @@ import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; /** * Tests affinity function with different number of backups. */ public class GridCacheAffinityBackupsSelfTest extends GridCommonAbstractTest { + /** */ + private final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + /** Number of backups. */ private int backups; @@ -45,6 +51,8 @@ public class GridCacheAffinityBackupsSelfTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + CacheConfiguration ccfg = new CacheConfiguration(); ccfg.setCacheMode(CacheMode.PARTITIONED); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 3a530f2..1d79e20 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -4696,7 +4696,7 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract TransactionIsolation txIsolation) throws Exception { - log.info("Test tx skip store [concurrency=" + txConcurrency + ", isolation=" + txIsolation + ']'); + info("Test tx skip store [concurrency=" + txConcurrency + ", isolation=" + txIsolation + ']'); cache.removeAll(data.keySet()); checkEmpty(cache, cacheSkipStore); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java index 6a0b9ad..19e49f3 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheMissingCommitVersionSelfTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCache; import org.apache.ignite.configuration.CacheConfiguration; @@ -32,13 +33,14 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; /** * */ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTest { /** */ - private volatile Integer failedKey; + private volatile boolean putFailed; /** */ private String maxCompletedTxCount; @@ -67,6 +69,7 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes ccfg.setCacheMode(PARTITIONED); ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(FULL_SYNC); cfg.setCacheConfiguration(ccfg); @@ -90,43 +93,48 @@ public class GridCacheMissingCommitVersionSelfTest extends GridCommonAbstractTes final AtomicInteger keyStart = new AtomicInteger(); + final ConcurrentLinkedDeque q = new ConcurrentLinkedDeque<>(); + GridTestUtils.runMultiThreaded(new Callable() { @Override public Object call() throws Exception { int start = keyStart.getAndAdd(KEYS_PER_THREAD); - for (int i = 0; i < KEYS_PER_THREAD && failedKey == null; i++) { + for (int i = 0; i < KEYS_PER_THREAD && !putFailed; i++) { int key = start + i; try { cache.put(key, 1); } catch (Exception e) { - log.info("Put failed: " + e); + log.info("Put failed [err=" + e + ", i=" + i + ']'); + + putFailed = true; - failedKey = key; + q.add(key); } } - return null; } }, 10, "put-thread"); - assertNotNull("Test failed to provoke 'missing commit version' error.", failedKey); + assertTrue("Test failed to provoke 'missing commit version' error.", putFailed); - log.info("Trying to update " + failedKey); + for (Integer key : q) { + log.info("Trying to update " + key); - IgniteCache asyncCache = cache.withAsync(); + IgniteCache asyncCache = cache.withAsync(); - asyncCache.put(failedKey, 2); + asyncCache.put(key, 2); - IgniteFuture fut = asyncCache.future(); + IgniteFuture fut = asyncCache.future(); - try { - fut.get(5000); - } - catch (IgniteFutureTimeoutException ignore) { - fail("Put failed to finish in 5s."); + try { + fut.get(5000); + } + catch (IgniteFutureTimeoutException ignore) { + fail("Put failed to finish in 5s: " + key); + } } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java index abb2767..b93acf5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTestEntryEx.java @@ -541,7 +541,6 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr @Nullable IgniteInternalTx tx, UUID evtNodeId, UUID affNodeId, - boolean writeThrough, boolean retval, boolean evt, boolean metrics, @@ -894,4 +893,4 @@ public class GridCacheTestEntryEx extends GridMetadataAwareAdapter implements Gr @Override public void onUnlock() { // No-op. } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java index a2440e2..ad51600 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/GridCachePartitionedMultiNodeFullApiSelfTest.java @@ -513,4 +513,4 @@ public class GridCachePartitionedMultiNodeFullApiSelfTest extends GridCacheParti assertTrue(((IgniteKernal)ignite).internalCache().context().isNear()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java index a2308c6..8f5e07b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.java @@ -1090,4 +1090,4 @@ public class GridEventConsumeSelfTest extends GridCommonAbstractTest { assertEquals("Not stopped IDs: " + notStopped, 0, notStopped.size()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java index e6dc7e6..8ce7ae3 100644 --- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridBoundedConcurrentLinkedHashMapSelfTest.java @@ -52,4 +52,4 @@ public class GridBoundedConcurrentLinkedHashMapSelfTest extends GridCommonAbstra assert it.next() == 9; assert it.next() == 10; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java index a09ba15..7bcbd07 100644 --- a/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/lang/utils/GridConcurrentLinkedHashMapSelfTest.java @@ -19,13 +19,18 @@ package org.apache.ignite.lang.utils; import java.util.Date; import java.util.Enumeration; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jsr166.ConcurrentLinkedHashMap; +import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q; +import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV; + /** * This class tests basic contracts of {@code ConcurrentLinkedHashMap}. */ @@ -264,4 +269,59 @@ public class GridConcurrentLinkedHashMapSelfTest extends GridCommonAbstractTest assert nextVal == -1 : "Unexpected value: " + nextVal; } -} \ No newline at end of file + + /** + * + */ + public void testIterationInPerSegmentModes() { + checkIteration(PER_SEGMENT_Q); + checkIteration(PER_SEGMENT_Q_OPTIMIZED_RMV); + } + + /** + * @param plc Policy. + */ + private void checkIteration(ConcurrentLinkedHashMap.QueuePolicy plc) { + ConcurrentLinkedHashMap map = + new ConcurrentLinkedHashMap<>(10, + 0.75f, + 16, + 0, + plc); + + Map map0 = new HashMap<>(); + + int cnt = 0; + + for (int i = 0; i < 100_000; i++) { + int key = ThreadLocalRandom.current().nextInt(15000); + int val = ThreadLocalRandom.current().nextInt(15000); + + Integer rmv0 = map0.put(key, val); + + if (rmv0 == null) + cnt++; + + Integer rmv = map.put(key, val); + + assertEquals(rmv0, rmv); + } + + int checkCnt = 0; + + for (Map.Entry e : map.entrySet()) { + checkCnt++; + + Integer rmv = map0.remove(e.getKey()); + + assertNotNull(rmv); + assertEquals(rmv, e.getValue()); + } + + assertEquals(cnt, checkCnt); + + info("Puts count: " + cnt); + + assert map0.isEmpty() : map0; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 71f3ee3..c19e718 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -95,7 +95,7 @@ import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_REA * Super class for all common tests. */ public abstract class GridCommonAbstractTest extends GridAbstractTest { - /**Cache peek modes array that consist of only ONHEAP mode. */ + /** Cache peek modes array that consist of only ONHEAP mode. */ protected static final CachePeekMode[] ONHEAP_PEEK_MODES = new CachePeekMode[] {CachePeekMode.ONHEAP}; /** @@ -1087,4 +1087,4 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { } } } -} \ No newline at end of file +}