Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9A1ED181D9 for ; Fri, 20 Nov 2015 14:21:02 +0000 (UTC) Received: (qmail 19786 invoked by uid 500); 20 Nov 2015 14:21:02 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 19681 invoked by uid 500); 20 Nov 2015 14:21:02 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 19471 invoked by uid 99); 20 Nov 2015 14:21:02 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Nov 2015 14:21:02 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5320BE00DB; Fri, 20 Nov 2015 14:21:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.apache.org Date: Fri, 20 Nov 2015 14:21:10 -0000 Message-Id: In-Reply-To: <2bb31ebc6c824427884020c2504481c5@git.apache.org> References: <2bb31ebc6c824427884020c2504481c5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [09/25] ignite git commit: Optimization for single key cache 'get' operation. http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java new file mode 100644 index 0000000..ba0081c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheDeployable; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class GridNearSingleGetResponse extends GridCacheMessage implements GridCacheDeployable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + public static final int INVALID_PART_FLAG_MASK = 0x1; + + /** */ + public static final int CONTAINS_VAL_FLAG_MASK = 0x2; + + /** Future ID. */ + private IgniteUuid futId; + + /** */ + private Message res; + + /** */ + private AffinityTopologyVersion topVer; + + /** Error. */ + @GridDirectTransient + private IgniteCheckedException err; + + /** Serialized error. */ + private byte[] errBytes; + + /** */ + private byte flags; + + /** + * Empty constructor required for {@link Message}. + */ + public GridNearSingleGetResponse() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param futId Future ID. + * @param topVer Topology version. + * @param res Result. + * @param invalidPartitions {@code True} if invalid partitions error occurred. + * @param addDepInfo Deployment info. + */ + public GridNearSingleGetResponse( + int cacheId, + IgniteUuid futId, + AffinityTopologyVersion topVer, + @Nullable Message res, + boolean invalidPartitions, + boolean addDepInfo + ) { + assert futId != null; + + this.cacheId = cacheId; + this.futId = futId; + this.topVer = topVer; + this.res = res; + this.addDepInfo = addDepInfo; + + if (invalidPartitions) + flags = (byte)(flags | INVALID_PART_FLAG_MASK); + } + + /** + * @param err Error. + */ + public void error(IgniteCheckedException err) { + this.err = err; + } + + /** {@inheritDoc} */ + @Override public IgniteCheckedException error() { + return err; + } + + /** + * @return Topology version. + */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return {@code True} if invalid partitions error occurred. + */ + public boolean invalidPartitions() { + return (flags & INVALID_PART_FLAG_MASK) != 0; + } + + /** + * @return Results for request with set flag {@link GridNearSingleGetRequest#skipValues()}. + */ + public boolean containsValue() { + return (flags & CONTAINS_VAL_FLAG_MASK) != 0; + } + + /** + * + */ + public void setContainsValue() { + flags = (byte)(flags | CONTAINS_VAL_FLAG_MASK); + } + + /** + * @return Result. + */ + public Message result() { + return res; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (res != null) { + GridCacheContext cctx = ctx.cacheContext(cacheId); + + if (res instanceof CacheObject) + prepareMarshalCacheObject((CacheObject) res, cctx); + else if (res instanceof CacheVersionedValue) + ((CacheVersionedValue)res).prepareMarshal(cctx.cacheObjectContext()); + else if (res instanceof GridCacheEntryInfo) + ((GridCacheEntryInfo)res).marshal(cctx); + } + + if (err != null) + errBytes = ctx.marshaller().marshal(err); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (res != null) { + GridCacheContext cctx = ctx.cacheContext(cacheId()); + + if (res instanceof CacheObject) + ((CacheObject)res).finishUnmarshal(cctx.cacheObjectContext(), ldr); + else if (res instanceof CacheVersionedValue) + ((CacheVersionedValue)res).finishUnmarshal(cctx, ldr); + else if (res instanceof GridCacheEntryInfo) + ((GridCacheEntryInfo)res).unmarshal(cctx, ldr); + } + + if (errBytes != null && err == null) + err = ctx.marshaller().unmarshal(errBytes, ldr); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeIgniteUuid("futId", futId)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeMessage("res", res)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + futId = reader.readIgniteUuid("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + res = reader.readMessage("res"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridNearSingleGetResponse.class); + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 117; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 8; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearSingleGetResponse.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 1a26028..fd3d056 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -423,7 +423,7 @@ public class GridNearTransactionalCache extends GridNearCacheAdapter assert nodeId != null; assert res != null; - GridNearLockFuture fut = (GridNearLockFuture)ctx.mvcc().future(res.version(), + GridNearLockFuture fut = (GridNearLockFuture)ctx.mvcc().mvccFuture(res.version(), res.futureId()); if (fut != null) http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 9c022b2..102cc4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -79,7 +79,7 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu private GridCacheSharedContext cctx; /** Future ID. */ - private IgniteUuid futId; + private final IgniteUuid futId; /** Transaction. */ @GridToStringInclude @@ -125,26 +125,6 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu } /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return tx.xidVersion(); - } - - /** - * @return Involved nodes. - */ - @Override public Collection nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { for (IgniteInternalFuture fut : futures()) if (isMini(fut)) { @@ -298,7 +278,7 @@ public final class GridNearTxFinishFuture extends GridCompoundIdentityFutu } // Don't forget to clean up. - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeFuture(futId); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index b92be31..1c01e4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -332,7 +332,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { boolean readThrough, boolean async, final Collection keys, - boolean skipVals, + final boolean skipVals, final boolean needVer, final GridInClosure3 c ) { @@ -361,35 +361,70 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { }); } else if (cacheCtx.isColocated()) { - return cacheCtx.colocated().loadAsync( - keys, - readThrough, - /*force primary*/needVer, - topologyVersion(), - CU.subjectId(this, cctx), - resolveTaskName(), - /*deserializePortable*/false, - accessPolicy(cacheCtx, keys), - skipVals, - /*can remap*/true, - needVer, - /*keepCacheObject*/true - ).chain(new C1>, Void>() { - @Override public Void apply(IgniteInternalFuture> f) { - try { - Map map = f.get(); - - processLoaded(map, keys, needVer, c); - - return null; + if (keys.size() == 1) { + final KeyCacheObject key = F.first(keys); + + return cacheCtx.colocated().loadAsync( + key, + readThrough, + /*force primary*/needVer, + topologyVersion(), + CU.subjectId(this, cctx), + resolveTaskName(), + /*deserializePortable*/false, + accessPolicy(cacheCtx, keys), + skipVals, + /*can remap*/true, + needVer, + /*keepCacheObject*/true + ).chain(new C1, Void>() { + @Override public Void apply(IgniteInternalFuture f) { + try { + Object val = f.get(); + + processLoaded(key, val, needVer, skipVals, c); + + return null; + } + catch (Exception e) { + setRollbackOnly(); + + throw new GridClosureException(e); + } } - catch (Exception e) { - setRollbackOnly(); - - throw new GridClosureException(e); + }); + } + else { + return cacheCtx.colocated().loadAsync( + keys, + readThrough, + /*force primary*/needVer, + topologyVersion(), + CU.subjectId(this, cctx), + resolveTaskName(), + /*deserializePortable*/false, + accessPolicy(cacheCtx, keys), + skipVals, + /*can remap*/true, + needVer, + /*keepCacheObject*/true + ).chain(new C1>, Void>() { + @Override public Void apply(IgniteInternalFuture> f) { + try { + Map map = f.get(); + + processLoaded(map, keys, needVer, c); + + return null; + } + catch (Exception e) { + setRollbackOnly(); + + throw new GridClosureException(e); + } } - } - }); + }); + } } else { assert cacheCtx.isLocal(); @@ -409,29 +444,45 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { final Collection keys, boolean needVer, GridInClosure3 c) { - for (KeyCacheObject key : keys) { - Object val = map.get(key); + for (KeyCacheObject key : keys) + processLoaded(key, map.get(key), needVer, false, c); + } - if (val != null) { - Object v; - GridCacheVersion ver; + /** + * @param key Key. + * @param val Value. + * @param needVer If {@code true} version is required for loaded values. + * @param skipVals Skip values flag. + * @param c Closure. + */ + private void processLoaded( + KeyCacheObject key, + @Nullable Object val, + boolean needVer, + boolean skipVals, + GridInClosure3 c) { + if (val != null) { + Object v; + GridCacheVersion ver; - if (needVer) { - T2 t = (T2)val; + if (needVer) { + T2 t = (T2)val; - v = t.get1(); - ver = t.get2(); - } - else { - v = val; - ver = null; - } - - c.apply(key, v, ver); + v = t.get1(); + ver = t.get2(); } - else + else { + v = val; + ver = null; + } + + if (skipVals && v == Boolean.FALSE) c.apply(key, null, IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER); + else + c.apply(key, v, ver); } + else + c.apply(key, null, IgniteTxEntry.SER_READ_EMPTY_ENTRY_VER); } /** {@inheritDoc} */ @@ -771,7 +822,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (fut == null && !commitFut.compareAndSet(null, fut = new GridNearTxFinishFuture<>(cctx, this, true))) return commitFut.get(); - cctx.mvcc().addFuture(fut); + cctx.mvcc().addFuture(fut, fut.futureId()); final IgniteInternalFuture prepareFut = prepFut.get(); @@ -816,7 +867,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (!rollbackFut.compareAndSet(null, fut = new GridNearTxFinishFuture<>(cctx, this, false))) return rollbackFut.get(); - cctx.mvcc().addFuture(fut); + cctx.mvcc().addFuture(fut, fut.futureId()); IgniteInternalFuture prepFut = this.prepFut.get(); @@ -957,7 +1008,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/true); - cctx.mvcc().addFuture(fut); + cctx.mvcc().addFuture(fut, fut.futureId()); if (prep == null || prep.isDone()) { assert prep != null || optimistic(); @@ -1016,7 +1067,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { final GridDhtTxFinishFuture fut = new GridDhtTxFinishFuture<>(cctx, this, /*commit*/false); - cctx.mvcc().addFuture(fut); + cctx.mvcc().addFuture(fut, fut.futureId()); IgniteInternalFuture prep = prepFut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index cfaadc9..52cad91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -25,7 +25,7 @@ import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; +import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; @@ -48,8 +48,8 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.NOO /** * Common code for tx prepare in optimistic and pessimistic modes. */ -public abstract class GridNearTxPrepareFutureAdapter extends GridCompoundFuture - implements GridCacheFuture { +public abstract class GridNearTxPrepareFutureAdapter extends + GridCompoundFuture implements GridCacheMvccFuture { /** Logger reference. */ protected static final AtomicReference logRef = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 87da9a1..544d5b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -231,8 +231,8 @@ public class GridLocalCache extends GridCacheAdapter { /** * @param fut Clears future from cache. */ - void onFutureDone(GridCacheFuture fut) { - if (ctx.mvcc().removeFuture(fut)) { + void onFutureDone(GridLocalLockFuture fut) { + if (ctx.mvcc().removeMvccFuture(fut)) { if (log().isDebugEnabled()) log().debug("Explicitly removed future from map of futures: " + fut); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java index cb14b4c..d392d53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java @@ -154,11 +154,6 @@ public final class GridLocalLockFuture extends GridFutureAdapter } /** {@inheritDoc} */ - @Override public Collection nodes() { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { return false; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 0e5657b..a9846ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -477,7 +477,7 @@ public class IgniteTxHandler { */ private void processNearTxPrepareResponse(UUID nodeId, GridNearTxPrepareResponse res) { GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)ctx.mvcc() - .future(res.version(), res.futureId()); + .mvccFuture(res.version(), res.futureId()); if (fut == null) { U.warn(log, "Failed to find future for prepare response [sender=" + nodeId + ", res=" + res + ']'); @@ -495,8 +495,7 @@ public class IgniteTxHandler { private void processNearTxFinishResponse(UUID nodeId, GridNearTxFinishResponse res) { ctx.tm().onFinishedRemote(nodeId, res.threadId()); - GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future( - res.xid(), res.futureId()); + GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -513,7 +512,7 @@ public class IgniteTxHandler { * @param res Response. */ private void processDhtTxPrepareResponse(UUID nodeId, GridDhtTxPrepareResponse res) { - GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().future(res.version(), res.futureId()); + GridDhtTxPrepareFuture fut = (GridDhtTxPrepareFuture)ctx.mvcc().mvccFuture(res.version(), res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -534,8 +533,7 @@ public class IgniteTxHandler { assert res != null; if (res.checkCommitted()) { - GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future( - res.xid(), res.futureId()); + GridNearTxFinishFuture fut = (GridNearTxFinishFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -547,8 +545,7 @@ public class IgniteTxHandler { fut.onResult(nodeId, res); } else { - GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().future( - res.xid(), res.futureId()); + GridDhtTxFinishFuture fut = (GridDhtTxFinishFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -566,7 +563,8 @@ public class IgniteTxHandler { * @param req Request. * @return Future. */ - @Nullable public IgniteInternalFuture processNearTxFinishRequest(UUID nodeId, GridNearTxFinishRequest req) { + @Nullable public IgniteInternalFuture processNearTxFinishRequest(UUID nodeId, + GridNearTxFinishRequest req) { return finish(nodeId, null, req); } @@ -1359,8 +1357,7 @@ public class IgniteTxHandler { if (log.isDebugEnabled()) log.debug("Processing check prepared transaction response [nodeId=" + nodeId + ", res=" + res + ']'); - GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc(). - future(res.version(), res.futureId()); + GridCacheTxRecoveryFuture fut = (GridCacheTxRecoveryFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 9e44b10..ecb0595 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1372,7 +1372,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (txEntry != null) { CacheObject val = txEntry.value(); - // Read value from locked entry in group-lock transaction as well. if (txEntry.hasValue()) { if (!F.isEmpty(txEntry.entryProcessors())) val = txEntry.applyEntryProcessors(val); @@ -2224,6 +2223,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** * @param cacheCtx Cache context. * @param keys Keys to load. + * @param filter Filter. * @param ret Return value. * @param needReadVer Read version flag. * @param singleRmv {@code True} for single remove operation. http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 67bca51..247ccaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1764,7 +1764,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { tx.originatingNodeId(), tx.transactionNodes()); - cctx.mvcc().addFuture(fut); + cctx.mvcc().addFuture(fut, fut.futureId()); if (log.isDebugEnabled()) log.debug("Checking optimistic transaction state on remote nodes [tx=" + tx + ", fut=" + fut + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 6131f54..6cf10f4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.util.typedef.CI1; @@ -765,6 +766,12 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac } }; + IgniteInClosure> getAllOp = new CI1>() { + @Override public void apply(IgniteCache cache) { + cache.getAll(F.asSet(1, 2)); + } + }; + int cnt = 0; for (CacheAtomicityMode atomicityMode : CacheAtomicityMode.values()) { @@ -799,7 +806,9 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac log.info("Test cache get [atomicity=" + atomicityMode + ", syncMode=" + syncMode + ']'); - checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getOp); + checkOperationInProgressFails(client, ccfg, GridNearSingleGetResponse.class, getOp); + + checkOperationInProgressFails(client, ccfg, GridNearGetResponse.class, getAllOp); client.destroyCache(ccfg.getName()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java index 1d79e20..e8e86e9 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAbstractFullApiSelfTest.java @@ -382,6 +382,81 @@ public abstract class GridCacheAbstractFullApiSelfTest extends GridCacheAbstract /** * @throws Exception If failed. */ + public void testContainsKeyTx() throws Exception { + if (!txEnabled()) + return; + + IgniteCache cache = jcache(); + + IgniteTransactions txs = ignite(0).transactions(); + + for (int i = 0; i < 10; i++) { + String key = String.valueOf(i); + + try (Transaction tx = txs.txStart()) { + assertNull(key, cache.get(key)); + + assertFalse(cache.containsKey(key)); + + tx.commit(); + } + + try (Transaction tx = txs.txStart()) { + assertNull(key, cache.get(key)); + + cache.put(key, i); + + assertTrue(cache.containsKey(key)); + + tx.commit(); + } + } + } + + /** + * @throws Exception If failed. + */ + public void testContainsKeysTx() throws Exception { + if (!txEnabled()) + return; + + IgniteCache cache = jcache(); + + IgniteTransactions txs = ignite(0).transactions(); + + Set keys = new HashSet<>(); + + for (int i = 0; i < 10; i++) { + String key = String.valueOf(i); + + keys.add(key); + } + + try (Transaction tx = txs.txStart()) { + for (String key : keys) + assertNull(key, cache.get(key)); + + assertFalse(cache.containsKeys(keys)); + + tx.commit(); + } + + try (Transaction tx = txs.txStart()) { + for (String key : keys) + assertNull(key, cache.get(key)); + + for (String key : keys) + cache.put(key, 0); + + assertTrue(cache.containsKeys(keys)); + + tx.commit(); + } + } + + /** + * @throws Exception If failed. + */ public void testRemoveInExplicitLocks() throws Exception { if (lockingEnabled()) { IgniteCache cache = jcache(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java index 1ef77f2..45c8c2c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentTxMultiNodeTest.java @@ -613,21 +613,6 @@ public class GridCacheConcurrentTxMultiNodeTest extends GridCommonAbstractTest { X.println("Near entry [grid="+ g.name() + ", key=" + k + ", entry=" + nearEntry); X.println("DHT entry [grid=" + g.name() + ", key=" + k + ", entry=" + dhtEntry); - - GridCacheMvccCandidate nearCand = - nearEntry == null ? null : F.first(nearEntry.localCandidates()); - - if (nearCand != null) - X.println("Near futures: " + - nearEntry.context().mvcc().futures(nearCand.version())); - - GridCacheMvccCandidate dhtCand = - dhtEntry == null ? null : F.first(dhtEntry.localCandidates()); - - if (dhtCand != null) - X.println("Dht futures: " + - dhtEntry.context().mvcc().futures(dhtCand.version())); - } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java index cbfc97b..2cb4a00 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedGetSelfTest.java @@ -28,6 +28,7 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; @@ -228,7 +229,7 @@ public class GridCachePartitionedGetSelfTest extends GridCommonAbstractTest { @Override public void onMessage(UUID nodeId, Object msg) { info("Received message from node [nodeId=" + nodeId + ", msg=" + msg + ']'); - if (msg instanceof GridNearGetRequest) { + if (msg instanceof GridNearSingleGetRequest) { info("Setting flag: " + System.identityHashCode(received)); received.set(true); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java index 8ddee0c..e5cff5a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAbstractStopBusySelfTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import java.io.PrintWriter; import java.io.StringWriter; import java.util.List; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -37,6 +38,8 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; @@ -218,7 +221,7 @@ public abstract class IgniteCacheAbstractStopBusySelfTest extends GridCommonAbst * @throws Exception If failed. */ public void testGet() throws Exception { - bannedMsg.set(GridNearGetRequest.class); + bannedMsg.set(GridNearSingleGetRequest.class); executeTest(new Callable() { /** {@inheritDoc} */ @@ -235,6 +238,28 @@ public abstract class IgniteCacheAbstractStopBusySelfTest extends GridCommonAbst } /** + * @throws Exception If failed. + */ + public void testGetAll() throws Exception { + bannedMsg.set(GridNearGetRequest.class); + + executeTest(new Callable() { + /** {@inheritDoc} */ + @Override public Integer call() throws Exception { + info("Start operation."); + + Set keys = F.asSet(1, 2, 3); + + clientCache().getAll(keys); + + info("Stop operation."); + + return null; + } + }); + } + + /** * * @param call Closure executing cache operation. * @throws Exception If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java index f4423f7..9c1abc7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.CacheException; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; @@ -30,6 +31,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; /** @@ -37,7 +39,10 @@ import org.apache.ignite.internal.util.typedef.X; */ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTest { /** Allows to change behavior of readExternal method. */ - protected static AtomicInteger readCnt = new AtomicInteger(); + protected static final AtomicInteger readCnt = new AtomicInteger(); + + /** Allows to change behavior of readExternal method. */ + protected static final AtomicInteger valReadCnt = new AtomicInteger(); /** Iterable key. */ protected static int key = 0; @@ -86,71 +91,40 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes return cfg; } - /** Test key 1. */ - protected static class TestKey implements Externalizable { - /** Field. */ - @QuerySqlField(index = true) - private String field; - - /** - * @param field Test key 1. - */ - public TestKey(String field) { - this.field = field; - } - - /** Test key 1. */ - public TestKey() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - TestKey key = (TestKey)o; - - return !(field != null ? !field.equals(key.field) : key.field != null); - } + /** + * Sends put atomically and handles fail. + * + * @param k Key. + */ + protected void failAtomicPut(int k) { + try { + jcache(0).put(new TestKey(String.valueOf(k)), ""); - /** {@inheritDoc} */ - @Override public int hashCode() { - return field != null ? field.hashCode() : 0; + assert false : "p2p marshalling failed, but error response was not sent"; } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(field); + catch (CacheException e) { + assert X.hasCause(e, IOException.class); } - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - field = (String)in.readObject(); - - if (readCnt.decrementAndGet() <= 0) - throw new IOException("Class can not be unmarshalled."); - } + assert readCnt.get() == 0; //ensure we have read count as expected. } /** - * Sends put atomically and handles fail. + * Sends get atomically and handles fail. * * @param k Key. */ - protected void failAtomicPut(int k) { + protected void failGetAll(int k) { try { - jcache(0).put(new TestKey(String.valueOf(k)), ""); + Set keys = F.asSet(new TestKey(String.valueOf(k))); + + jcache(0).getAll(keys); assert false : "p2p marshalling failed, but error response was not sent"; } catch (CacheException e) { assert X.hasCause(e, IOException.class); } - - assert readCnt.get() == 0; //ensure we have read count as expected. } /** @@ -158,7 +132,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes * * @param k Key. */ - protected void failAtomicGet(int k) { + protected void failGet(int k) { try { jcache(0).get(new TestKey(String.valueOf(k))); @@ -175,38 +149,132 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes * @throws Exception If failed. */ public void testResponseMessageOnUnmarshallingFailed() throws Exception { - //GridNearAtomicUpdateRequest unmarshalling failed test + // GridNearAtomicUpdateRequest unmarshalling failed test. readCnt.set(1); failAtomicPut(++key); - //Check that cache is empty. + // Check that cache is empty. readCnt.set(Integer.MAX_VALUE); assert jcache(0).get(new TestKey(String.valueOf(key))) == null; - //GridDhtAtomicUpdateRequest unmarshalling failed test + // GridDhtAtomicUpdateRequest unmarshalling failed test. readCnt.set(2); failAtomicPut(++key); - //Check that cache is not empty. + // Check that cache is not empty. readCnt.set(Integer.MAX_VALUE); assert jcache(0).get(new TestKey(String.valueOf(key))) != null; - //GridNearGetRequest unmarshalling failed test + // GridNearGetRequest unmarshalling failed test. readCnt.set(1); - failAtomicGet(++key); + failGetAll(++key); - //GridNearGetResponse unmarshalling failed test + // GridNearGetResponse unmarshalling failed test. readCnt.set(Integer.MAX_VALUE); jcache(0).put(new TestKey(String.valueOf(++key)), ""); readCnt.set(2); - failAtomicGet(key); + failGetAll(key); + + readCnt.set(Integer.MAX_VALUE); + valReadCnt.set(Integer.MAX_VALUE); + + jcache(0).put(new TestKey(String.valueOf(++key)), new TestValue()); + + assertNotNull(new TestKey(String.valueOf(key))); + + // GridNearSingleGetRequest unmarshalling failed. + readCnt.set(1); + + failGet(key); + + // GridNearSingleGetRequest unmarshalling failed. + valReadCnt.set(1); + readCnt.set(2); + + failGet(key); + } + + /** + * Test key. + */ + protected static class TestKey implements Externalizable { + /** Field. */ + @QuerySqlField(index = true) + private String field; + + /** + * Required by {@link Externalizable}. + */ + public TestKey() { + // No-op. + } + + /** + * @param field Test key 1. + */ + public TestKey(String field) { + this.field = field; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + TestKey key = (TestKey)o; + + return !(field != null ? !field.equals(key.field) : key.field != null); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return field != null ? field.hashCode() : 0; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(field); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + field = (String)in.readObject(); + + if (readCnt.decrementAndGet() <= 0) + throw new IOException("Class can not be unmarshalled."); + } + } + + /** + * Test value. + */ + protected static class TestValue implements Externalizable { + /** + * Required by {@link Externalizable}. + */ + public TestValue() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + if (valReadCnt.decrementAndGet() <= 0) + throw new IOException("Class can not be unmarshalled."); + } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java index 53ac648..c005945 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheGetFutureHangsSelfTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -29,6 +30,7 @@ import org.apache.ignite.cache.CacheMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; @@ -135,12 +137,16 @@ public class CacheGetFutureHangsSelfTest extends GridCommonAbstractTest { @Override public void run() { T2 ignite; + Set keys = F.asSet(1, 2, 3, 4, 5); + while ((ignite = randomNode()) != null) { IgniteCache cache = ignite.get1().cache(null); for (int i = 0; i < 100; i++) cache.containsKey(ThreadLocalRandom.current().nextInt(100_000)); + cache.containsKeys(keys); + assertTrue(nodes.compareAndSet(ignite.get2(), null, ignite.get1())); try { http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java index 6f0565b..dbd8758 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheAbstractNodeRestartSelfTest.java @@ -661,6 +661,8 @@ public abstract class GridCacheAbstractNodeRestartSelfTest extends GridCommonAbs // It is ok if primary node leaves grid. } + cache.get(key); + int c = putCntr.incrementAndGet(); if (c % logFreq == 0) http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java new file mode 100644 index 0000000..42b5ee3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheSingleGetMessageTest.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.IgniteSpiException; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * + */ +public class IgniteCacheSingleGetMessageTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int SRVS = 4; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setClientMode(client); + + TestCommunicationSpi commSpi = new TestCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(SRVS); + + client = true; + + startGridsMultiThreaded(SRVS, 1); + + client = false; + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + super.afterTestsStopped(); + + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testSingleGetMessage() throws Exception { + assertFalse(ignite(0).configuration().isClientMode()); + assertTrue(ignite(SRVS).configuration().isClientMode()); + + List> ccfgs = cacheConfigurations(); + + for (int i = 0; i < ccfgs.size(); i++) { + CacheConfiguration ccfg = ccfgs.get(i); + + ccfg.setName("cache-" + i); + + log.info("Test cache: " + i); + + ignite(0).createCache(ccfg); + + try { + IgniteCache srvCache = ignite(0).cache(ccfg.getName()); + IgniteCache clientCache = ignite(SRVS).cache(ccfg.getName()); + + Integer key = nearKey(clientCache); + + checkSingleGetMessage(clientCache, key, false); + + if (ccfg.getBackups() > 0) { + key = backupKeys(srvCache, 1, 100_000).get(0); + + checkSingleGetMessage(srvCache, key, true); + } + + if (ccfg.getCacheMode() != REPLICATED) { + key = nearKeys(srvCache, 1, 200_000).get(0); + + checkSingleGetMessage(srvCache, key, false); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + } + + /** + * @param cache Cache. + * @param key Key. + * @param backup {@code True} if given key is backup key. + * @throws Exception If failed. + */ + @SuppressWarnings("unchecked") + private void checkSingleGetMessage(IgniteCache cache, + Integer key, + boolean backup) throws Exception { + CacheConfiguration ccfg = cache.getConfiguration(CacheConfiguration.class); + + Ignite node = cache.unwrap(Ignite.class); + + TestCommunicationSpi spi = (TestCommunicationSpi)node.configuration().getCommunicationSpi(); + + spi.record(GridNearSingleGetRequest.class); + + Ignite primary = primaryNode(key, cache.getName()); + + assertNotSame(node, primary); + + TestCommunicationSpi primarySpi = (TestCommunicationSpi)primary.configuration().getCommunicationSpi(); + + primarySpi.record(GridNearSingleGetResponse.class); + + assertNull(cache.get(key)); + + checkMessages(spi, primarySpi); + + assertFalse(cache.containsKey(key)); + + checkMessages(spi, primarySpi); + + cache.put(key, 1); + + assertNotNull(cache.get(key)); + + if (backup) + checkNoMessages(spi, primarySpi); + else + checkMessages(spi, primarySpi); + + assertTrue(cache.containsKey(key)); + + if (backup) + checkNoMessages(spi, primarySpi); + else + checkMessages(spi, primarySpi); + + if (ccfg.getAtomicityMode() == TRANSACTIONAL) { + cache.remove(key); + + try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { + assertNull(cache.get(key)); + + tx.commit(); + } + + checkMessages(spi, primarySpi); + + try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { + assertFalse(cache.containsKey(key)); + + tx.commit(); + } + + checkMessages(spi, primarySpi); + + cache.put(key, 1); + + try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { + assertNotNull(cache.get(key)); + + tx.commit(); + } + + if (backup) + checkNoMessages(spi, primarySpi); + else + checkMessages(spi, primarySpi); + + try (Transaction tx = node.transactions().txStart(OPTIMISTIC, REPEATABLE_READ)) { + assertTrue(cache.containsKey(key)); + + tx.commit(); + } + + if (backup) + checkNoMessages(spi, primarySpi); + else + checkMessages(spi, primarySpi); + } + } + + /** + * @param spi Near node SPI. + * @param primarySpi Primary node SPI. + */ + private void checkMessages(TestCommunicationSpi spi, TestCommunicationSpi primarySpi) { + List msgs = spi.recordedMessages(); + + assertEquals(1, msgs.size()); + assertTrue(msgs.get(0) instanceof GridNearSingleGetRequest); + + msgs = primarySpi.recordedMessages(); + + assertEquals(1, msgs.size()); + assertTrue(msgs.get(0) instanceof GridNearSingleGetResponse); + } + + /** + * @param spi Near node SPI. + * @param primarySpi Primary node SPI. + */ + private void checkNoMessages(TestCommunicationSpi spi, TestCommunicationSpi primarySpi) { + List msgs = spi.recordedMessages(); + assertEquals(0, msgs.size()); + + msgs = primarySpi.recordedMessages(); + assertEquals(0, msgs.size()); + } + + /** + * @return Cache configurations to test. + */ + private List> cacheConfigurations() { + List> ccfgs = new ArrayList<>(); + + ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, FULL_SYNC, 0)); + ccfgs.add(cacheConfiguration(PARTITIONED, TRANSACTIONAL, FULL_SYNC, 1)); + ccfgs.add(cacheConfiguration(REPLICATED, TRANSACTIONAL, FULL_SYNC, 0)); + + ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, FULL_SYNC, 0)); + ccfgs.add(cacheConfiguration(PARTITIONED, ATOMIC, FULL_SYNC, 1)); + ccfgs.add(cacheConfiguration(REPLICATED, ATOMIC, FULL_SYNC, 0)); + + return ccfgs; + } + + /** + * @param cacheMode Cache mode. + * @param atomicityMode Cache atomicity mode. + * @param syncMode Write synchronization mode. + * @param backups Number of backups. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration( + CacheMode cacheMode, + CacheAtomicityMode atomicityMode, + CacheWriteSynchronizationMode syncMode, + int backups) { + CacheConfiguration ccfg = new CacheConfiguration<>(); + + ccfg.setCacheMode(cacheMode); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setAtomicityMode(TRANSACTIONAL); + ccfg.setWriteSynchronizationMode(syncMode); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + private static class TestCommunicationSpi extends TcpCommunicationSpi { + /** */ + private Class recordCls; + + /** */ + private List recordedMsgs = new ArrayList<>(); + + /** {@inheritDoc} */ + @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure ackC) + throws IgniteSpiException { + if (msg instanceof GridIoMessage) { + Object msg0 = ((GridIoMessage)msg).message(); + + synchronized (this) { + if (recordCls != null && msg0.getClass().equals(recordCls)) + recordedMsgs.add(msg0); + } + } + + super.sendMessage(node, msg, ackC); + } + + /** + * @param recordCls Message class to record. + */ + void record(@Nullable Class recordCls) { + synchronized (this) { + this.recordCls = recordCls; + } + } + + /** + * @return Recorded messages. + */ + List recordedMessages() { + synchronized (this) { + List msgs = recordedMsgs; + + recordedMsgs = new ArrayList<>(); + + return msgs; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMetricsSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMetricsSelfTest.java index 319aa56..2fa43be 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheReplicatedMetricsSelfTest.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.replicated; import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.processors.cache.GridCacheTransactionalAbstractMetricsSelfTest; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -30,14 +29,6 @@ public class GridCacheReplicatedMetricsSelfTest extends GridCacheTransactionalAb /** */ private static final int GRID_CNT = 2; - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration c = super.getConfiguration(gridName); - - c.getTransactionConfiguration().setTxSerializableEnabled(true); - - return c; - } - /** {@inheritDoc} */ @Override protected CacheConfiguration cacheConfiguration(String gridName) throws Exception { CacheConfiguration cfg = super.cacheConfiguration(gridName); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java index b89bffd..fcc8d37 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.CacheGetFutureHan import org.apache.ignite.internal.processors.cache.distributed.CacheNoValueClassOnServerNodeTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutMultiNodeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheCreatePutTest; +import org.apache.ignite.internal.processors.cache.distributed.IgniteCacheSingleGetMessageTest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridCacheDhtTxPreloadSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheLockFailoverSelfTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheMultiTxLockSelfTest; @@ -283,6 +284,8 @@ public class IgniteCacheTestSuite4 extends TestSuite { suite.addTestSuite(CacheGetFutureHangsSelfTest.class); + suite.addTestSuite(IgniteCacheSingleGetMessageTest.class); + return suite; } } \ No newline at end of file