Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3D9F9200BD3 for ; Mon, 21 Nov 2016 14:42:52 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3CAD2160B19; Mon, 21 Nov 2016 13:42:52 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 750C7160B22 for ; Mon, 21 Nov 2016 14:42:50 +0100 (CET) Received: (qmail 87773 invoked by uid 500); 21 Nov 2016 13:42:49 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 87630 invoked by uid 99); 21 Nov 2016 13:42:49 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Nov 2016 13:42:49 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 74B39E0999; Mon, 21 Nov 2016 13:42:49 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: av@apache.org To: commits@ignite.apache.org Date: Mon, 21 Nov 2016 13:43:01 -0000 Message-Id: <906792a7d32f43edb4bfa214786a71f2@git.apache.org> In-Reply-To: <7dc6e865feaf43a899457f50af238fc5@git.apache.org> References: <7dc6e865feaf43a899457f50af238fc5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [13/15] ignite git commit: IGNITE-2523 "single put" NEAR update request archived-at: Mon, 21 Nov 2016 13:42:52 -0000 IGNITE-2523 "single put" NEAR update request Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a24a394b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a24a394b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a24a394b Branch: refs/heads/ignite-4242 Commit: a24a394bb66ba0237a9e9ef940707d422b2980f0 Parents: 0234f67 Author: Konstantin Dudkov Authored: Mon Nov 21 13:53:58 2016 +0300 Committer: Konstantin Dudkov Committed: Mon Nov 21 13:53:58 2016 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 24 +- .../processors/cache/GridCacheIoManager.java | 64 +- .../processors/cache/GridCachePreloader.java | 11 + .../cache/GridCachePreloaderAdapter.java | 7 + .../dht/atomic/GridDhtAtomicCache.java | 96 +- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 12 +- ...idNearAtomicAbstractSingleUpdateRequest.java | 562 +++++++++ .../GridNearAtomicAbstractUpdateFuture.java | 15 +- .../GridNearAtomicAbstractUpdateRequest.java | 226 ++++ .../atomic/GridNearAtomicFullUpdateRequest.java | 1031 +++++++++++++++++ ...GridNearAtomicSingleUpdateFilterRequest.java | 226 ++++ .../GridNearAtomicSingleUpdateFuture.java | 137 ++- ...GridNearAtomicSingleUpdateInvokeRequest.java | 303 +++++ .../GridNearAtomicSingleUpdateRequest.java | 359 ++++++ .../dht/atomic/GridNearAtomicUpdateFuture.java | 42 +- .../dht/atomic/GridNearAtomicUpdateRequest.java | 1092 ------------------ .../dht/preloader/GridDhtPreloader.java | 20 + .../distributed/near/GridNearAtomicCache.java | 10 +- .../resources/META-INF/classnames.properties | 77 +- .../CacheAtomicSingleMessageCountSelfTest.java | 259 +++++ .../GridCacheAtomicMessageCountSelfTest.java | 18 +- .../IgniteCacheAtomicStopBusySelfTest.java | 10 +- .../IgniteCacheP2pUnmarshallingErrorTest.java | 2 +- ...niteCacheClientNodeChangingTopologyTest.java | 22 +- ...eAtomicInvalidPartitionHandlingSelfTest.java | 2 +- 25 files changed, 3332 insertions(+), 1295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index fd55224..b20de68 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -69,7 +69,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnlock import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicDeferredUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; @@ -392,7 +395,7 @@ public class GridIoMessageFactory implements MessageFactory { break; case 40: - msg = new GridNearAtomicUpdateRequest(); + msg = new GridNearAtomicFullUpdateRequest(); break; @@ -756,7 +759,22 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..119] [124] - this + case 125: + msg = new GridNearAtomicSingleUpdateRequest(); + + break; + + case 126: + msg = new GridNearAtomicSingleUpdateInvokeRequest(); + + break; + + case 127: + msg = new GridNearAtomicSingleUpdateFilterRequest(); + + break; + + // [-3..119] [124..127] - this // [120..123] - DR // [-4..-22, -30..-35] - SQL default: http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index e450287..c5c1c60 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -47,7 +47,11 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrep import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicFullUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateFilterRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateInvokeRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; @@ -462,8 +466,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * @return Atomic future ID if applicable for message. */ @Nullable private GridCacheVersion atomicFututeId(GridCacheMessage cacheMsg) { - if (cacheMsg instanceof GridNearAtomicUpdateRequest) - return ((GridNearAtomicUpdateRequest)cacheMsg).futureVersion(); + if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest) + return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).futureVersion(); else if (cacheMsg instanceof GridNearAtomicUpdateResponse) return ((GridNearAtomicUpdateResponse) cacheMsg).futureVersion(); else if (cacheMsg instanceof GridDhtAtomicUpdateRequest) @@ -480,8 +484,8 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { * @return Atomic future ID if applicable for message. */ @Nullable private GridCacheVersion atomicWriteVersion(GridCacheMessage cacheMsg) { - if (cacheMsg instanceof GridNearAtomicUpdateRequest) - return ((GridNearAtomicUpdateRequest)cacheMsg).updateVersion(); + if (cacheMsg instanceof GridNearAtomicAbstractUpdateRequest) + return ((GridNearAtomicAbstractUpdateRequest)cacheMsg).updateVersion(); else if (cacheMsg instanceof GridDhtAtomicUpdateRequest) return ((GridDhtAtomicUpdateRequest)cacheMsg).writeVersion(); @@ -562,7 +566,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; case 40: { - GridNearAtomicUpdateRequest req = (GridNearAtomicUpdateRequest)msg; + GridNearAtomicFullUpdateRequest req = (GridNearAtomicFullUpdateRequest)msg; GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( ctx.cacheId(), @@ -739,6 +743,54 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { break; + case 125: { + GridNearAtomicSingleUpdateRequest req = (GridNearAtomicSingleUpdateRequest)msg; + + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + ctx.cacheId(), + nodeId, + req.futureVersion(), + ctx.deploymentEnabled()); + + res.error(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + } + + break; + + case 126: { + GridNearAtomicSingleUpdateInvokeRequest req = (GridNearAtomicSingleUpdateInvokeRequest)msg; + + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + ctx.cacheId(), + nodeId, + req.futureVersion(), + ctx.deploymentEnabled()); + + res.error(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + } + + break; + + case 127: { + GridNearAtomicSingleUpdateFilterRequest req = (GridNearAtomicSingleUpdateFilterRequest)msg; + + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse( + ctx.cacheId(), + nodeId, + req.futureVersion(), + ctx.deploymentEnabled()); + + res.error(req.classError()); + + sendResponseOnFailedMessage(nodeId, res, cctx, ctx.ioPolicy()); + } + + break; + default: throw new IgniteCheckedException("Failed to send response to node. Unsupported direct type [message=" + msg + "]", msg.classError()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index a49bb04..1d1cfab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -24,6 +24,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; @@ -140,6 +141,16 @@ public interface GridCachePreloader { public IgniteInternalFuture request(Collection keys, AffinityTopologyVersion topVer); /** + * Requests that preloader sends the request for the key. + * + * @param req Message with keys to request. + * @param topVer Topology version, {@code -1} if not required. + * @return Future to complete when all keys are preloaded. + */ + public IgniteInternalFuture request(GridNearAtomicAbstractUpdateRequest req, + AffinityTopologyVersion topVer); + + /** * Force preload process. */ public void forcePreload(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index 58b75df..b15ebc5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -26,6 +26,7 @@ import org.apache.ignite.cache.affinity.AffinityFunction; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessageV2; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; @@ -149,6 +150,12 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ + @Override public IgniteInternalFuture request(GridNearAtomicAbstractUpdateRequest req, + AffinityTopologyVersion topVer) { + return new GridFinishedFuture<>(); + } + + /** {@inheritDoc} */ @Override public void onInitialExchangeComplete(@Nullable Throwable err) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 30a3d57..f7d1973 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -136,7 +136,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { /** Update reply closure. */ @GridToStringExclude - private CI2 updateReplyClos; + private CI2 updateReplyClos; /** Pending */ private GridDeferredAckMessageSender deferredUpdateMessageSender; @@ -200,9 +200,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Override protected void init() { super.init(); - updateReplyClos = new CI2() { + updateReplyClos = new CI2() { @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { assert req.writeSynchronizationMode() != FULL_ASYNC : req; @@ -323,11 +323,11 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { ctx.io().addHandler( ctx.cacheId(), - GridNearAtomicUpdateRequest.class, - new CI2() { + GridNearAtomicAbstractUpdateRequest.class, + new CI2() { @Override public void apply( UUID nodeId, - GridNearAtomicUpdateRequest req + GridNearAtomicAbstractUpdateRequest req ) { processNearAtomicUpdateRequest( nodeId, @@ -335,8 +335,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { } @Override public String toString() { - return "GridNearAtomicUpdateRequest handler " + - "[msgIdx=" + GridNearAtomicUpdateRequest.CACHE_MSG_IDX + ']'; + return "GridNearAtomicAbstractUpdateRequest handler " + + "[msgIdx=" + GridNearAtomicAbstractUpdateRequest.CACHE_MSG_IDX + ']'; } }); @@ -1252,7 +1252,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { CacheEntryPredicate[] filters = CU.filterArray(filter); - if (conflictPutVal == null && conflictRmvVer == null && !isFastMap(filters, op)) { + if (conflictPutVal == null && + conflictRmvVer == null && + !isFastMap(filters, op)) { return new GridNearAtomicSingleUpdateFuture( ctx, this, @@ -1603,10 +1605,10 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { */ public void updateAllAsyncInternal( final UUID nodeId, - final GridNearAtomicUpdateRequest req, - final CI2 completionCb + final GridNearAtomicAbstractUpdateRequest req, + final CI2 completionCb ) { - IgniteInternalFuture forceFut = preldr.request(req.keys(), req.topologyVersion()); + IgniteInternalFuture forceFut = preldr.request(req, req.topologyVersion()); if (forceFut == null || forceFut.isDone()) { try { @@ -1652,8 +1654,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param e Error. */ private void onForceKeysError(final UUID nodeId, - final GridNearAtomicUpdateRequest req, - final CI2 completionCb, + final GridNearAtomicAbstractUpdateRequest req, + final CI2 completionCb, IgniteCheckedException e ) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), @@ -1673,17 +1675,15 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param req Update request. * @param completionCb Completion callback. */ - public void updateAllAsyncInternal0( + private void updateAllAsyncInternal0( UUID nodeId, - GridNearAtomicUpdateRequest req, - CI2 completionCb + GridNearAtomicAbstractUpdateRequest req, + CI2 completionCb ) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), ctx.deploymentEnabled()); - List keys = req.keys(); - - assert !req.returnValue() || (req.operation() == TRANSFORM || keys.size() == 1); + assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1); GridDhtAtomicUpdateFuture dhtFut = null; @@ -1696,7 +1696,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { try { // If batch store update is enabled, we need to lock all entries. // First, need to acquire locks on cache entries, then check filter. - List locked = lockEntries(keys, req.topologyVersion()); + List locked = lockEntries(req, req.topologyVersion()); Collection> deleted = null; @@ -1707,7 +1707,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { try { if (top.stopping()) { - res.addFailedKeys(keys, new IgniteCheckedException("Failed to perform cache operation " + + res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " + "(cache is stopped): " + name())); completionCb.apply(req, res); @@ -1757,7 +1757,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { GridCacheReturn retVal = null; - if (keys.size() > 1 && // Several keys ... + if (req.size() > 1 && // Several keys ... writeThrough() && !req.skipStore() && // and store is enabled ... !ctx.store().isLocal() && // and this is not local store ... // (conflict resolver should be used for local store) @@ -1853,7 +1853,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { // an attempt to use cleaned resources. U.error(log, "Unexpected exception during cache update", e); - res.addFailedKeys(keys, e); + res.addFailedKeys(req.keys(), e); completionCb.apply(req, res); @@ -1866,7 +1866,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { if (remap) { assert dhtFut == null; - res.remapKeys(keys); + res.remapKeys(req.keys()); completionCb.apply(req, res); } @@ -1904,12 +1904,12 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { private UpdateBatchResult updateWithBatch( final ClusterNode node, final boolean hasNear, - final GridNearAtomicUpdateRequest req, + final GridNearAtomicAbstractUpdateRequest req, final GridNearAtomicUpdateResponse res, final List locked, final GridCacheVersion ver, @Nullable GridDhtAtomicUpdateFuture dhtFut, - final CI2 completionCb, + final CI2 completionCb, final boolean replicate, final String taskName, @Nullable final IgniteCacheExpiryPolicy expiry, @@ -1929,7 +1929,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { } } - int size = req.keys().size(); + int size = req.size(); Map putMap = null; @@ -2327,12 +2327,12 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { private UpdateSingleResult updateSingle( ClusterNode node, boolean hasNear, - GridNearAtomicUpdateRequest req, + GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res, List locked, GridCacheVersion ver, @Nullable GridDhtAtomicUpdateFuture dhtFut, - CI2 completionCb, + CI2 completionCb, boolean replicate, String taskName, @Nullable IgniteCacheExpiryPolicy expiry, @@ -2341,8 +2341,6 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { GridCacheReturn retVal = null; Collection> deleted = null; - List keys = req.keys(); - AffinityTopologyVersion topVer = req.topologyVersion(); boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); @@ -2352,8 +2350,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { boolean intercept = ctx.config().getInterceptor() != null; // Avoid iterator creation. - for (int i = 0; i < keys.size(); i++) { - KeyCacheObject k = keys.get(i); + for (int i = 0; i < req.size(); i++) { + KeyCacheObject k = req.key(i); GridCacheOperation op = req.operation(); @@ -2489,7 +2487,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { if (updRes.removeVersion() != null) { if (deleted == null) - deleted = new ArrayList<>(keys.size()); + deleted = new ArrayList<>(req.size()); deleted.add(F.t(entry, updRes.removeVersion())); } @@ -2565,8 +2563,8 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { @Nullable final Collection rmvKeys, @Nullable final Map> entryProcessorMap, @Nullable GridDhtAtomicUpdateFuture dhtFut, - final CI2 completionCb, - final GridNearAtomicUpdateRequest req, + final CI2 completionCb, + final GridNearAtomicAbstractUpdateRequest req, final GridNearAtomicUpdateResponse res, final boolean replicate, final UpdateBatchResult batchRes, @@ -2803,17 +2801,17 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { /** * Acquires java-level locks on cache entries. Returns collection of locked entries. * - * @param keys Keys to lock. + * @param req Request with keys to lock. * @param topVer Topology version to lock on. * @return Collection of locked entries. * @throws GridDhtInvalidPartitionException If entry does not belong to local node. If exception is thrown, * locks are released. */ @SuppressWarnings("ForLoopReplaceableByForEach") - private List lockEntries(List keys, AffinityTopologyVersion topVer) + private List lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { - if (keys.size() == 1) { - KeyCacheObject key = keys.get(0); + if (req.size() == 1) { + KeyCacheObject key = req.key(0); while (true) { try { @@ -2836,12 +2834,12 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { } } else { - List locked = new ArrayList<>(keys.size()); + List locked = new ArrayList<>(req.size()); while (true) { - for (KeyCacheObject key : keys) { + for (int i = 0; i < req.size(); i++) { try { - GridDhtCacheEntry entry = entryExx(key, topVer); + GridDhtCacheEntry entry = entryExx(req.key(i), topVer); locked.add(entry); } @@ -2946,7 +2944,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * will return false. * @return {@code True} if filter evaluation succeeded. */ - private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicUpdateRequest req, + private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { try { return ctx.isAllLocked(entry, req.filter()); @@ -2961,7 +2959,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { /** * @param req Request to remap. */ - private void remapToNewPrimary(GridNearAtomicUpdateRequest req) { + private void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) { assert req.writeSynchronizationMode() == FULL_ASYNC : req; if (log.isDebugEnabled()) @@ -3040,9 +3038,9 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { */ @Nullable private GridDhtAtomicUpdateFuture createDhtFuture( GridCacheVersion writeVer, - GridNearAtomicUpdateRequest updateReq, + GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes, - CI2 completionCb, + CI2 completionCb, boolean force ) { if (!force) { @@ -3073,7 +3071,7 @@ public class GridDhtAtomicCache extends GridDhtCacheAdapter { * @param nodeId Sender node ID. * @param req Near atomic update request. */ - private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) { + private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (msgLog.isDebugEnabled()) { msgLog.debug("Received near atomic update request [futId=" + req.futureVersion() + ", writeVer=" + req.updateVersion() + http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 4e59d11..c2ad8b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -83,7 +83,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter /** Completion callback. */ @GridToStringExclude - private final CI2 completionCb; + private final CI2 completionCb; /** Mappings. */ @GridToStringInclude @@ -93,7 +93,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter private Map nearReadersEntries; /** Update request. */ - private final GridNearAtomicUpdateRequest updateReq; + private final GridNearAtomicAbstractUpdateRequest updateReq; /** Update response. */ private final GridNearAtomicUpdateResponse updateRes; @@ -119,9 +119,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter */ public GridDhtAtomicUpdateFuture( GridCacheContext cctx, - CI2 completionCb, + CI2 completionCb, GridCacheVersion writeVer, - GridNearAtomicUpdateRequest updateReq, + GridNearAtomicAbstractUpdateRequest updateReq, GridNearAtomicUpdateResponse updateRes ) { this.cctx = cctx; @@ -137,8 +137,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class); } - keys = new ArrayList<>(updateReq.keys().size()); - mappings = U.newHashMap(updateReq.keys().size()); + keys = new ArrayList<>(updateReq.size()); + mappings = U.newHashMap(updateReq.size()); waitForExchange = !(updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest())); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java new file mode 100644 index 0000000..61deeee --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java @@ -0,0 +1,562 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.io.Externalizable; +import java.nio.ByteBuffer; +import java.util.UUID; +import javax.cache.expiry.ExpiryPolicy; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNearAtomicAbstractUpdateRequest { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final CacheEntryPredicate[] NO_FILTER = new CacheEntryPredicate[0]; + + /** Fast map flag mask. */ + private static final int FAST_MAP_FLAG_MASK = 0x1; + + /** Flag indicating whether request contains primary keys. */ + private static final int HAS_PRIMARY_FLAG_MASK = 0x2; + + /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ + private static final int TOP_LOCKED_FLAG_MASK = 0x4; + + /** Skip write-through to a persistent storage. */ + private static final int SKIP_STORE_FLAG_MASK = 0x8; + + /** */ + private static final int CLIENT_REQ_FLAG_MASK = 0x10; + + /** Keep binary flag. */ + private static final int KEEP_BINARY_FLAG_MASK = 0x20; + + /** Return value flag. */ + private static final int RET_VAL_FLAG_MASK = 0x40; + + /** Target node ID. */ + @GridDirectTransient + protected UUID nodeId; + + /** Future version. */ + protected GridCacheVersion futVer; + + /** Update version. Set to non-null if fastMap is {@code true}. */ + private GridCacheVersion updateVer; + + /** Topology version. */ + protected AffinityTopologyVersion topVer; + + /** Write synchronization mode. */ + protected CacheWriteSynchronizationMode syncMode; + + /** Update operation. */ + protected GridCacheOperation op; + + /** Subject ID. */ + protected UUID subjId; + + /** Task name hash. */ + protected int taskNameHash; + + /** */ + @GridDirectTransient + private GridNearAtomicUpdateResponse res; + + /** Compressed boolean flags. */ + protected byte flags; + + /** + * Empty constructor required by {@link Externalizable}. + */ + protected GridNearAtomicAbstractSingleUpdateRequest() { + // No-op. + } + + /** + * Constructor. + * + * @param cacheId Cache ID. + * @param nodeId Node ID. + * @param futVer Future version. + * @param fastMap Fast map scheme flag. + * @param updateVer Update version set if fast map is performed. + * @param topVer Topology version. + * @param topLocked Topology locked flag. + * @param syncMode Synchronization mode. + * @param op Cache update operation. + * @param retval Return value required flag. + * @param subjId Subject ID. + * @param taskNameHash Task name hash code. + * @param skipStore Skip write-through to a persistent storage. + * @param keepBinary Keep binary flag. + * @param clientReq Client node request flag. + * @param addDepInfo Deployment info flag. + */ + protected GridNearAtomicAbstractSingleUpdateRequest( + int cacheId, + UUID nodeId, + GridCacheVersion futVer, + boolean fastMap, + @Nullable GridCacheVersion updateVer, + @NotNull AffinityTopologyVersion topVer, + boolean topLocked, + CacheWriteSynchronizationMode syncMode, + GridCacheOperation op, + boolean retval, + @Nullable UUID subjId, + int taskNameHash, + boolean skipStore, + boolean keepBinary, + boolean clientReq, + boolean addDepInfo + ) { + assert futVer != null; + + this.cacheId = cacheId; + this.nodeId = nodeId; + this.futVer = futVer; + this.updateVer = updateVer; + this.topVer = topVer; + this.syncMode = syncMode; + this.op = op; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.addDepInfo = addDepInfo; + + fastMap(fastMap); + topologyLocked(topLocked); + returnValue(retval); + skipStore(skipStore); + keepBinary(keepBinary); + clientRequest(clientReq); + } + + /** {@inheritDoc} */ + @Override public int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** + * @return Mapped node ID. + */ + @Override public UUID nodeId() { + return nodeId; + } + + /** + * @param nodeId Node ID. + */ + @Override public void nodeId(UUID nodeId) { + this.nodeId = nodeId; + } + + /** + * @return Subject ID. + */ + @Override public UUID subjectId() { + return subjId; + } + + /** + * @return Task name hash. + */ + @Override public int taskNameHash() { + return taskNameHash; + } + + /** + * @return Future version. + */ + @Override public GridCacheVersion futureVersion() { + return futVer; + } + + /** + * @return Update version for fast-map request. + */ + @Override public GridCacheVersion updateVersion() { + return updateVer; + } + + /** + * @return Topology version. + */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Cache write synchronization mode. + */ + @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { + return syncMode; + } + + /** + * @return Expiry policy. + */ + @Override public ExpiryPolicy expiry() { + return null; + } + + /** + * @return Update operation. + */ + @Override public GridCacheOperation operation() { + return op; + } + + /** + * @return Optional arguments for entry processor. + */ + @Override @Nullable public Object[] invokeArguments() { + return null; + } + + /** + * @param res Response. + * @return {@code True} if current response was {@code null}. + */ + @Override public boolean onResponse(GridNearAtomicUpdateResponse res) { + if (this.res == null) { + this.res = res; + + return true; + } + + return false; + } + + /** + * @return Response. + */ + @Override @Nullable public GridNearAtomicUpdateResponse response() { + return res; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { + return ctx.atomicMessageLogger(); + } + + /** + * @return Flag indicating whether this is fast-map udpate. + */ + @Override public boolean fastMap() { + return isFlag(FAST_MAP_FLAG_MASK); + } + + /** + * Sets fastMap flag value. + */ + public void fastMap(boolean val) { + setFlag(val, FAST_MAP_FLAG_MASK); + } + + /** + * @return Topology locked flag. + */ + @Override public boolean topologyLocked() { + return isFlag(TOP_LOCKED_FLAG_MASK); + } + + /** + * Sets topologyLocked flag value. + */ + public void topologyLocked(boolean val) { + setFlag(val, TOP_LOCKED_FLAG_MASK); + } + + /** + * @return {@code True} if request sent from client node. + */ + @Override public boolean clientRequest() { + return isFlag(CLIENT_REQ_FLAG_MASK); + } + + /** + * Sets clientRequest flag value. + */ + public void clientRequest(boolean val) { + setFlag(val, CLIENT_REQ_FLAG_MASK); + } + + /** + * @return Return value flag. + */ + @Override public boolean returnValue() { + return isFlag(RET_VAL_FLAG_MASK); + } + + /** + * Sets returnValue flag value. + */ + public void returnValue(boolean val) { + setFlag(val, RET_VAL_FLAG_MASK); + } + + /** + * @return Skip write-through to a persistent storage. + */ + @Override public boolean skipStore() { + return isFlag(SKIP_STORE_FLAG_MASK); + } + + /** + * Sets skipStore flag value. + */ + public void skipStore(boolean val) { + setFlag(val, SKIP_STORE_FLAG_MASK); + } + + /** + * @return Keep binary flag. + */ + @Override public boolean keepBinary() { + return isFlag(KEEP_BINARY_FLAG_MASK); + } + + /** + * Sets keepBinary flag value. + */ + public void keepBinary(boolean val) { + setFlag(val, KEEP_BINARY_FLAG_MASK); + } + + /** + * @return Flag indicating whether this request contains primary keys. + */ + @Override public boolean hasPrimary() { + return isFlag(HAS_PRIMARY_FLAG_MASK); + } + + /** + * Sets hasPrimary flag value. + */ + public void hasPrimary(boolean val) { + setFlag(val, HAS_PRIMARY_FLAG_MASK); + } + + /** {@inheritDoc} */ + @Nullable @Override public CacheEntryPredicate[] filter() { + return NO_FILTER; + } + + /** + * Sets flag mask. + * + * @param flag Set or clear. + * @param mask Mask. + */ + private void setFlag(boolean flag, int mask) { + flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + } + + /** + * Reads flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + private boolean isFlag(int mask) { + return (flags & mask) != 0; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMessage("futVer", futVer)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeUuid("subjId", subjId)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeMessage("updateVer", updateVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + futVer = reader.readMessage("futVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + byte opOrd; + + opOrd = reader.readByte("op"); + + if (!reader.isLastRead()) + return false; + + op = GridCacheOperation.fromOrdinal(opOrd); + + reader.incrementState(); + + case 6: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + byte syncModeOrd; + + syncModeOrd = reader.readByte("syncMode"); + + if (!reader.isLastRead()) + return false; + + syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); + + reader.incrementState(); + + case 8: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + updateVer = reader.readMessage("updateVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridNearAtomicAbstractSingleUpdateRequest.class); + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 11; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 85751bb..2fbabaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; @@ -36,10 +39,6 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import javax.cache.expiry.ExpiryPolicy; -import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; - import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; @@ -255,11 +254,11 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt * @param nodeId Node ID. * @param req Request. */ - protected void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { + protected void mapSingle(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { if (cctx.localNodeId().equals(nodeId)) { cache.updateAllAsyncInternal(nodeId, req, - new CI2() { - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + new CI2() { + @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { onResult(res.nodeId(), res, false); } }); @@ -303,7 +302,7 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt * @param req Request. * @param e Error. */ - protected void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { + protected void onSendError(GridNearAtomicAbstractUpdateRequest req, IgniteCheckedException e) { synchronized (mux) { GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), req.nodeId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/a24a394b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java new file mode 100644 index 0000000..bee2ecd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateRequest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.util.List; +import java.util.UUID; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheDeployable; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public abstract class GridNearAtomicAbstractUpdateRequest extends GridCacheMessage implements GridCacheDeployable { + /** Message index. */ + public static final int CACHE_MSG_IDX = nextIndexId(); + + /** + * @return Mapped node ID. + */ + public abstract UUID nodeId(); + + /** + * @param nodeId Node ID. + */ + public abstract void nodeId(UUID nodeId); + + /** + * @return Subject ID. + */ + public abstract UUID subjectId(); + + /** + * @return Task name hash. + */ + public abstract int taskNameHash(); + + /** + * @return Future version. + */ + public abstract GridCacheVersion futureVersion(); + + /** + * @return Flag indicating whether this is fast-map udpate. + */ + public abstract boolean fastMap(); + + /** + * @return Update version for fast-map request. + */ + public abstract GridCacheVersion updateVersion(); + + /** + * @return Topology locked flag. + */ + public abstract boolean topologyLocked(); + + /** + * @return {@code True} if request sent from client node. + */ + public abstract boolean clientRequest(); + + /** + * @return Cache write synchronization mode. + */ + public abstract CacheWriteSynchronizationMode writeSynchronizationMode(); + + /** + * @return Expiry policy. + */ + public abstract ExpiryPolicy expiry(); + + /** + * @return Return value flag. + */ + public abstract boolean returnValue(); + + /** + * @return Filter. + */ + @Nullable public abstract CacheEntryPredicate[] filter(); + + /** + * @return Skip write-through to a persistent storage. + */ + public abstract boolean skipStore(); + + /** + * @return Keep binary flag. + */ + public abstract boolean keepBinary(); + + /** + * @return Update operation. + */ + public abstract GridCacheOperation operation(); + + /** + * @return Optional arguments for entry processor. + */ + @Nullable public abstract Object[] invokeArguments(); + + /** + * @return Flag indicating whether this request contains primary keys. + */ + public abstract boolean hasPrimary(); + + /** + * @param res Response. + * @return {@code True} if current response was {@code null}. + */ + public abstract boolean onResponse(GridNearAtomicUpdateResponse res); + + /** + * @return Response. + */ + @Nullable public abstract GridNearAtomicUpdateResponse response(); + + /** + * @param key Key to add. + * @param val Optional update value. + * @param conflictTtl Conflict TTL (optional). + * @param conflictExpireTime Conflict expire time (optional). + * @param conflictVer Conflict version (optional). + * @param primary If given key is primary on this mapping. + */ + public abstract void addUpdateEntry(KeyCacheObject key, + @Nullable Object val, + long conflictTtl, + long conflictExpireTime, + @Nullable GridCacheVersion conflictVer, + boolean primary); + + /** + * @return Keys for this update request. + */ + public abstract List keys(); + + /** + * @return Values for this update request. + */ + public abstract List values(); + + /** + * @param idx Key index. + * @return Value. + */ + public abstract CacheObject value(int idx); + + /** + * @param idx Key index. + * @return Entry processor. + */ + public abstract EntryProcessor entryProcessor(int idx); + + /** + * @param idx Index to get. + * @return Write value - either value, or transform closure. + */ + public abstract CacheObject writeValue(int idx); + + + /** + * @return Conflict versions. + */ + @Nullable public abstract List conflictVersions(); + + /** + * @param idx Index. + * @return Conflict version. + */ + @Nullable public abstract GridCacheVersion conflictVersion(int idx); + + /** + * @param idx Index. + * @return Conflict TTL. + */ + public abstract long conflictTtl(int idx); + + /** + * @param idx Index. + * @return Conflict expire time. + */ + public abstract long conflictExpireTime(int idx); + + /** + * Cleanup values. + * + * @param clearKeys If {@code true} clears keys. + */ + public abstract void cleanup(boolean clearKeys); + + /** + * @return Keys size. + */ + public abstract int size(); + + /** + * @param idx Key index. + * @return Key. + */ + public abstract KeyCacheObject key(int idx); +}