Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0B7B610264 for ; Mon, 29 Dec 2014 13:48:49 +0000 (UTC) Received: (qmail 73275 invoked by uid 500); 29 Dec 2014 13:48:44 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 73245 invoked by uid 500); 29 Dec 2014 13:48:44 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 73236 invoked by uid 99); 29 Dec 2014 13:48:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Dec 2014 13:48:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 29 Dec 2014 13:48:42 +0000 Received: (qmail 73074 invoked by uid 99); 29 Dec 2014 13:48:22 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Dec 2014 13:48:22 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id BB1A9920DF9; Mon, 29 Dec 2014 13:48:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Mon, 29 Dec 2014 13:48:22 -0000 Message-Id: <5a50be274c8e4c3c9dadbef1743e2cc0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-ignite git commit: ignite-44 X-Virus-Checked: Checked by ClamAV on apache.org ignite-44 Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/58de0b22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/58de0b22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/58de0b22 Branch: refs/heads/ignite-44-8273 Commit: 58de0b22f9b9ec7b08ae882308ab0b067638923c Parents: 1600e5c Author: sboikov Authored: Mon Dec 29 16:45:54 2014 +0300 Committer: sboikov Committed: Mon Dec 29 16:45:54 2014 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheInvokeResult.java | 9 +- .../kernal/processors/cache/GridCacheUtils.java | 29 +++++ .../distributed/GridDistributedLockRequest.java | 1 + .../dht/GridDhtTransactionalCacheAdapter.java | 46 ++++++-- .../colocated/GridDhtColocatedLockFuture.java | 6 ++ .../distributed/near/GridNearLockResponse.java | 106 ++++++++++++++++++- .../distributed/near/GridNearTxRemote.java | 6 ++ .../cache/transactions/IgniteTxEntry.java | 19 +++- .../transactions/IgniteTxLocalAdapter.java | 37 +++---- .../GridCacheReturnValueTransferSelfTest.java | 6 +- 10 files changed, 225 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java index ab0959e..4d51c4e 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/CacheInvokeResult.java @@ -45,8 +45,6 @@ public class CacheInvokeResult implements EntryProcessorResult, Externaliz * @param res Computed result. */ public CacheInvokeResult(T res) { - assert res != null; - this.res = res; } @@ -57,6 +55,13 @@ public class CacheInvokeResult implements EntryProcessorResult, Externaliz this.err = err; } + /** + * @return {@code True} if both result and error are {@code null}. + */ + public boolean empty() { + return res == null && err == null; + } + /** {@inheritDoc} */ @Override public Object ggClassId() { return GG_CLASS_ID; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java index 1646bf1..4e8f985 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/GridCacheUtils.java @@ -27,6 +27,7 @@ import org.jdk8.backport.*; import org.jetbrains.annotations.*; import javax.cache.expiry.*; +import javax.cache.processor.*; import java.io.*; import java.util.*; import java.util.concurrent.*; @@ -1671,4 +1672,32 @@ public class GridCacheUtils { return duration.getTimeUnit().toMillis(duration.getDurationAmount()); } + + /** + * @param txEntry Entry. + * @param val Value. + * @return Invoke result. + */ + @Nullable public static CacheInvokeResult computeInvokeResult( + IgniteTxEntry txEntry, V val, boolean ignoreNull) { + try { + Object res = null; + + for (T2, Object[]> t : txEntry.entryProcessors()) { + CacheInvokeEntry invokeEntry = new CacheInvokeEntry<>(txEntry.key(), val); + + EntryProcessor entryProcessor = t.get1(); + + res = entryProcessor.process(invokeEntry, t.get2()); + } + + if (res == null && ignoreNull) + return null; + else + return new CacheInvokeResult<>(res); + } + catch (Exception e) { + return new CacheInvokeResult<>(e); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java index 749eff0..c8cdd65 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/GridDistributedLockRequest.java @@ -106,6 +106,7 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage } /** + * @param cacheId Cache ID. * @param nodeId Node ID. * @param nearXidVer Near transaction ID. * @param threadId Thread ID. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index b7ff63e..27f7336 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -955,10 +955,17 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach assert mappedVer != null; assert tx == null || tx.xidVersion().equals(mappedVer); + boolean nearCacheReq = U.hasNearCache(nearNode, ctx.name()); + try { // Send reply back to originating near node. GridNearLockResponse res = new GridNearLockResponse<>(ctx.cacheId(), - req.version(), req.futureId(), req.miniId(), tx != null && tx.onePhaseCommit(), entries.size(), err); + req.version(), + req.futureId(), + req.miniId(), + tx != null && tx.onePhaseCommit(), + entries.size(), + err); if (err == null) { res.pending(localDhtPendingVersions(entries, mappedVer)); @@ -984,11 +991,29 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach try { GridCacheVersion ver = e.version(); - boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver); + IgniteTxEntry writeEntry = null; + + boolean ret; + + if (req.implicitTx()) { + ret = req.returnValue(i) || + (nearCacheReq && (dhtVer == null || !dhtVer.equals(ver))); + } + else + ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver); + + boolean invoke = false; + + if (!ret && tx != null && req.hasTransforms()) { + writeEntry = tx.entry(ctx.txKey(e.key())); + + if (writeEntry.op() == TRANSFORM) + invoke = true; + } V val = null; - if (ret) + if (ret || invoke) val = e.innerGet(tx, /*swap*/true, /*read-through*/true, @@ -1014,7 +1039,8 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach boolean filterPassed = false; if (tx != null && tx.onePhaseCommit()) { - IgniteTxEntry writeEntry = tx.entry(ctx.txKey(e.key())); + if (writeEntry == null) + writeEntry = tx.entry(ctx.txKey(e.key())); assert writeEntry != null : "Missing tx entry for locked cache entry: " + e; @@ -1032,6 +1058,7 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach filterPassed, ver, mappedVer, + invoke ? computeInvokeResult(writeEntry, val, false) : null, ctx); } catch (GridCacheFilterFailedException ex) { @@ -1043,7 +1070,7 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach else { // We include values into response since they are required for local // calls and won't be serialized. We are also including DHT version. - res.addValueBytes(null, null, false, e.version(), mappedVer, ctx); + res.addValueBytes(null, null, false, e.version(), mappedVer, null, ctx); } break; @@ -1069,8 +1096,13 @@ public abstract class GridDhtTransactionalCacheAdapter extends GridDhtCach U.error(log, "Failed to get value for lock reply message for node [node=" + U.toShortString(nearNode) + ", req=" + req + ']', e); - return new GridNearLockResponse<>(ctx.cacheId(), req.version(), req.futureId(), req.miniId(), false, - entries.size(), e); + return new GridNearLockResponse<>(ctx.cacheId(), + req.version(), + req.futureId(), + req.miniId(), + false, + entries.size(), + e); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index e6a4eb7..545ad31 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -1261,6 +1261,12 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentity // In colocated cache we must receive responses only for detached entries. assert txEntry.cached().detached(); + if (txEntry.op() == GridCacheOperation.TRANSFORM) { + CacheInvokeResult invokeRes = res.invokeResult(i); + + txEntry.invokeResult(invokeRes); + } + txEntry.markLocked(); GridDhtDetachedCacheEntry entry = (GridDhtDetachedCacheEntry)txEntry.cached(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java index 7711470..5ea89ad 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/distributed/near/GridNearLockResponse.java @@ -49,6 +49,15 @@ public class GridNearLockResponse extends GridDistributedLockResponse> invokeRes; + + /** Serialized results for invoke operation. */ + @GridDirectCollection(byte[].class) + private List invokeResBytes; + /** * Empty constructor (required by {@link Externalizable}). */ @@ -148,6 +157,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse extends GridDistributedLockResponse res, GridCacheContext ctx ) throws IgniteCheckedException { int idx = valuesSize(); @@ -167,10 +178,39 @@ public class GridNearLockResponse extends GridDistributedLockResponse(dhtVers.length); + + invokeRes.add(res); + } + // Delegate to super. addValueBytes(val, valBytes, ctx); } + /** + * @param idx Key index. + * @return Result for invoke operation. + */ + public CacheInvokeResult invokeResult(int idx) { + return invokeRes != null ? invokeRes.get(idx) : null; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + invokeResBytes = marshalCollection(invokeRes, ctx); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + invokeRes = unmarshalCollection(invokeResBytes, ctx, ldr); + } + /** {@inheritDoc} */ @SuppressWarnings({"CloneDoesntCallSuperClone", "CloneCallsConstructors"}) @Override public GridTcpCommunicationMessageAdapter clone() { @@ -192,6 +232,8 @@ public class GridNearLockResponse extends GridDistributedLockResponse extends GridDistributedLockResponse extends GridDistributedLockResponse extends GridDistributedLockResponse= 0) { + if (invokeResBytes == null) + invokeResBytes = new ArrayList<>(commState.readSize); + + for (int i = commState.readItems; i < commState.readSize; i++) { + byte[] _val = commState.getByteArray(); + + if (_val == BYTE_ARR_NOT_READ) + return false; + + invokeResBytes.add((byte[])_val); + + commState.readItems++; + } + } + + commState.readSize = -1; + commState.readItems = 0; + + commState.idx++; + + case 14: + if (commState.readSize == -1) { + if (buf.remaining() < 4) + return false; + + commState.readSize = commState.getInt(); + } + + if (commState.readSize >= 0) { if (mappedVers == null) mappedVers = new GridCacheVersion[commState.readSize]; @@ -385,7 +483,7 @@ public class GridNearLockResponse extends GridDistributedLockResponse extends GridDistributedLockResponse extends GridDistributedTxRemoteAdapter * @param ctx Cache registry. * @param txSize Expected transaction size. * @param grpLockKey Group lock key if this is a group-lock transaction. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. * @throws IgniteCheckedException If unmarshalling failed. */ public GridNearTxRemote( @@ -130,6 +132,8 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter * @param ctx Cache registry. * @param txSize Expected transaction size. * @param grpLockKey Collection of group lock keys if this is a group-lock transaction. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. */ public GridNearTxRemote( GridCacheSharedContext ctx, @@ -311,8 +315,10 @@ public class GridNearTxRemote extends GridDistributedTxRemoteAdapter } /** + * @param cacheCtx Cache context. * @param key Key to add to read set. * @param keyBytes Key bytes. + * @param op Cache operation. * @param val Value. * @param valBytes Value bytes. * @param drVer Data center replication version. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java index 73d17b5..25c1fb3 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxEntry.java @@ -70,10 +70,13 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, /** Filter bytes. */ private byte[] filterBytes; - /** Transform. */ + /** EntryProcessors for invoke operation. */ @GridToStringInclude private Collection, Object[]>> entryProcessorsCol; + /** */ + private CacheInvokeResult invokeRes; + /** Transform closure bytes. */ @GridToStringExclude private byte[] transformClosBytes; @@ -624,6 +627,20 @@ public class IgniteTxEntry implements GridPeerDeployAware, Externalizable, } /** + * @param invokeRes Remotely computed result for invoke operation. + */ + public void invokeResult(@Nullable CacheInvokeResult invokeRes) { + this.invokeRes = invokeRes; + } + + /** + * @return Remotely computed result for invoke operation. + */ + @Nullable public CacheInvokeResult invokeResult() { + return invokeRes; + } + + /** * @return Collection of entry processors. */ public Collection, Object[]>> entryProcessors() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java index 6380605..472f607 100644 --- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -2259,7 +2259,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (!F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter)) retval = true; - if (retval || txEntry.op() == TRANSFORM) { + boolean invoke = computeInvoke && txEntry.op() == TRANSFORM && txEntry.invokeResult() == null; + + if (retval || invoke) { if (!cacheCtx.isNear()) { try { if (!hasPrevVal) @@ -2288,14 +2290,18 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter v = cached.rawGetOrUnmarshal(false); } - if (txEntry.op() == TRANSFORM) { - if (computeInvoke) - addInvokeResult(txEntry, v, ret); - } - else + if (retval) ret.value(v); } + if (computeInvoke && txEntry.op() == TRANSFORM) { + CacheInvokeResult res = + invoke ? CU.computeInvokeResult(txEntry, v, true) : txEntry.invokeResult(); + + if (res != null && !res.empty()) + ret.addEntryProcessResult(k, res); + } + boolean pass = cacheCtx.isAll(cached, filter); // For remove operation we return true only if we are removing s/t, @@ -2359,23 +2365,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param ret Return value to update. */ private void addInvokeResult(IgniteTxEntry txEntry, V val, GridCacheReturn ret) { - try { - Object res = null; - - for (T2, Object[]> t : txEntry.entryProcessors()) { - CacheInvokeEntry invokeEntry = new CacheInvokeEntry<>(txEntry.key(), val); + CacheInvokeResult res = CU.computeInvokeResult(txEntry, val, true); - EntryProcessor entryProcessor = t.get1(); - - res = entryProcessor.process(invokeEntry, t.get2()); - } - - if (res != null) - ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult<>(res)); - } - catch (Exception e) { - ret.addEntryProcessResult(txEntry.key(), new CacheInvokeResult(e)); - } + if (res != null) + ret.addEntryProcessResult(txEntry.key(), res); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/58de0b22/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java index b65fcad..342740f 100644 --- a/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java +++ b/modules/core/src/test/java/org/gridgain/grid/kernal/processors/cache/GridCacheReturnValueTransferSelfTest.java @@ -90,17 +90,15 @@ public class GridCacheReturnValueTransferSelfTest extends GridCommonAbstractTest /** * @throws Exception If failed. - * TODO gg-8273 enable when fixed */ - public void _testTransformTransactionalNoBackups() throws Exception { + public void testTransformTransactionalNoBackups() throws Exception { checkTransform(TRANSACTIONAL, PRIMARY, 0); } /** * @throws Exception If failed. - * TODO gg-8273 enable when fixed */ - public void _testTransformTransactionalOneBackup() throws Exception { + public void testTransformTransactionalOneBackup() throws Exception { checkTransform(TRANSACTIONAL, PRIMARY, 1); }