Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EA396200C4D for ; Tue, 21 Mar 2017 10:24:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E8C94160B74; Tue, 21 Mar 2017 09:24:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EF2C7160BAF for ; Tue, 21 Mar 2017 10:23:54 +0100 (CET) Received: (qmail 64527 invoked by uid 500); 21 Mar 2017 09:23:54 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 63161 invoked by uid 99); 21 Mar 2017 09:23:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Mar 2017 09:23:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 297DCDFF71; Tue, 21 Mar 2017 09:23:53 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Tue, 21 Mar 2017 09:24:22 -0000 Message-Id: <7ad17d79fa95408f845f11918a28d12b@git.apache.org> In-Reply-To: <4661a570aee14e228ed239b8206de8cf@git.apache.org> References: <4661a570aee14e228ed239b8206de8cf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [31/71] [abbrv] ignite git commit: ignite-4705 Atomic cache protocol change: notify client node from backups archived-at: Tue, 21 Mar 2017 09:24:01 -0000 http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java index 923b220..9fe183f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicDeferredUpdateResponse.java @@ -19,17 +19,18 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; import java.nio.ByteBuffer; -import java.util.Collection; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; /** * Deferred dht atomic update response. @@ -42,13 +43,12 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem public static final int CACHE_MSG_IDX = nextIndexId(); /** ACK future versions. */ - @GridDirectCollection(GridCacheVersion.class) - private Collection futVers; + private GridLongList futIds; - /** {@inheritDoc} */ - @Override public int lookupIndex() { - return CACHE_MSG_IDX; - } + /** */ + @GridDirectTransient + @GridToStringExclude + private GridTimeoutObject timeoutSnd; /** * Empty constructor required by {@link Externalizable} @@ -61,27 +61,42 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem * Constructor. * * @param cacheId Cache ID. - * @param futVers Future versions. - * @param addDepInfo Deployment info. + * @param futIds Future IDs. */ - public GridDhtAtomicDeferredUpdateResponse(int cacheId, Collection futVers, boolean addDepInfo) { - assert !F.isEmpty(futVers); - + public GridDhtAtomicDeferredUpdateResponse(int cacheId, GridLongList futIds) { this.cacheId = cacheId; - this.futVers = futVers; - this.addDepInfo = addDepInfo; + this.futIds = futIds; + } + + /** + * @param timeoutSnd Callback sending response on timeout. + */ + void timeoutSender(@Nullable GridTimeoutObject timeoutSnd) { + this.timeoutSnd = timeoutSnd; + } + + /** + * @return Callback sending response on timeout. + */ + @Nullable GridTimeoutObject timeoutSender() { + return timeoutSnd; + } + + /** {@inheritDoc} */ + @Override public int lookupIndex() { + return CACHE_MSG_IDX; } /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { - return addDepInfo; + return false; } /** - * @return List of ACKed future versions. + * @return List of ACKed future ids. */ - public Collection futureVersions() { - return futVers; + GridLongList futureIds() { + return futIds; } /** {@inheritDoc} */ @@ -105,7 +120,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem switch (writer.state()) { case 3: - if (!writer.writeCollection("futVers", futVers, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("futIds", futIds)) return false; writer.incrementState(); @@ -127,7 +142,7 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem switch (reader.state()) { case 3: - futVers = reader.readCollection("futVers", MessageCollectionItemType.MSG); + futIds = reader.readMessage("futIds"); if (!reader.isLastRead()) return false; @@ -148,4 +163,9 @@ public class GridDhtAtomicDeferredUpdateResponse extends GridCacheMessage implem @Override public byte fieldsCount() { return 4; } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtAtomicDeferredUpdateResponse.class, this); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java new file mode 100644 index 0000000..08a7e28 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicNearResponse.java @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicAbstractUpdateRequest.DHT_ATOMIC_HAS_RESULT_MASK; + +/** + * Message sent from DHT nodes to near node in FULL_SYNC mode. + */ +public class GridDhtAtomicNearResponse extends GridCacheMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Message index. */ + public static final int CACHE_MSG_IDX = nextIndexId(); + + /** */ + private int partId; + + /** */ + private long futId; + + /** */ + private UUID primaryId; + + /** */ + @GridToStringExclude + private byte flags; + + /** */ + @GridToStringInclude + private UpdateErrors errs; + + /** + * + */ + public GridDhtAtomicNearResponse() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param partId Partition. + * @param futId Future ID. + * @param primaryId Primary node ID. + * @param flags Flags. + */ + public GridDhtAtomicNearResponse(int cacheId, + int partId, + long futId, + UUID primaryId, + byte flags) + { + assert primaryId != null; + + this.cacheId = cacheId; + this.partId = partId; + this.futId = futId; + this.primaryId = primaryId; + this.flags = flags; + } + + /** + * @return Errors. + */ + @Nullable UpdateErrors errors() { + return errs; + } + + /** + * @param errs Errors. + */ + public void errors(UpdateErrors errs) { + this.errs = errs; + } + + /** + * @return Primary node ID. + */ + UUID primaryId() { + return primaryId; + } + + /** {@inheritDoc} */ + @Override public int partition() { + return partId; + } + + /** + * @param key Key. + * @param e Error. + */ + public void addFailedKey(KeyCacheObject key, Throwable e) { + if (errs == null) + errs = new UpdateErrors(); + + errs.addFailedKey(key, e); + } + + /** + * @return Operation result. + */ + public GridCacheReturn result() { + assert hasResult() : this; + + return new GridCacheReturn(true, true); + } + + /** + * @return {@code True} if response contains operation result. + */ + boolean hasResult() { + return isFlag(DHT_ATOMIC_HAS_RESULT_MASK); + } + + /** + * Reads flag mask. + * + * @param mask Mask to read. + * @return Flag value. + */ + private boolean isFlag(int mask) { + return (flags & mask) != 0; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -45; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 8; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (errs != null) + errs.prepareMarshal(this, ctx.cacheContext(cacheId)); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (errs != null) + errs.finishUnmarshal(this, ctx.cacheContext(cacheId), ldr); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeMessage("errs", errs)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeInt("partId", partId)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeUuid("primaryId", primaryId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + errs = reader.readMessage("errs"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + partId = reader.readInt("partId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + primaryId = reader.readUuid("primaryId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtAtomicNearResponse.class); + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder flags = new StringBuilder(); + + if (hasResult()) + appendFlag(flags, "hasRes"); + + return S.toString(GridDhtAtomicNearResponse.class, this, + "flags", flags.toString()); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java index 0dc2754..8ebe9c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateFuture.java @@ -24,16 +24,11 @@ import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteProductVersion; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -45,64 +40,45 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture private static final long serialVersionUID = 0L; /** */ - private static final IgniteProductVersion SINGLE_UPDATE_REQUEST = IgniteProductVersion.fromString("1.7.4"); - - /** Future keys. */ - private KeyCacheObject key; - - /** Entries with readers. */ - private GridDhtCacheEntry nearReaderEntry; + private boolean allUpdated; /** * @param cctx Cache context. - * @param completionCb Callback to invoke when future is completed. * @param writeVer Write version. * @param updateReq Update request. - * @param updateRes Update response. */ GridDhtAtomicSingleUpdateFuture( GridCacheContext cctx, - CI2 completionCb, GridCacheVersion writeVer, - GridNearAtomicAbstractUpdateRequest updateReq, - GridNearAtomicUpdateResponse updateRes + GridNearAtomicAbstractUpdateRequest updateReq ) { - super(cctx, - completionCb, - writeVer, - updateReq, - updateRes); + super(cctx, writeVer, updateReq); + } + + /** {@inheritDoc} */ + @Override protected boolean sendAllToDht() { + return allUpdated; } /** {@inheritDoc} */ @Override protected void addDhtKey(KeyCacheObject key, List dhtNodes) { - assert this.key == null || this.key.equals(key) : this.key; + if (mappings == null) { + allUpdated = true; - if (mappings == null) mappings = U.newHashMap(dhtNodes.size()); - - this.key = key; + } } /** {@inheritDoc} */ @Override protected void addNearKey(KeyCacheObject key, Collection readers) { - assert this.key == null || this.key.equals(key) : this.key; - if (mappings == null) mappings = U.newHashMap(readers.size()); - - this.key = key; - } - - /** {@inheritDoc} */ - @Override protected void addNearReaderEntry(GridDhtCacheEntry entry) { - nearReaderEntry = entry; } /** {@inheritDoc} */ @Override protected GridDhtAtomicAbstractUpdateRequest createRequest( - ClusterNode node, - GridCacheVersion futVer, + UUID nodeId, + long futId, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @NotNull AffinityTopologyVersion topVer, @@ -110,11 +86,11 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture long conflictExpireTime, @Nullable GridCacheVersion conflictVer ) { - if (canUseSingleRequest(node, ttl, conflictExpireTime, conflictVer)) { + if (canUseSingleRequest(ttl, conflictExpireTime, conflictVer)) { return new GridDhtAtomicSingleUpdateRequest( cctx.cacheId(), - node.id(), - futVer, + nodeId, + futId, writeVer, syncMode, topVer, @@ -127,68 +103,37 @@ class GridDhtAtomicSingleUpdateFuture extends GridDhtAtomicAbstractUpdateFuture else { return new GridDhtAtomicUpdateRequest( cctx.cacheId(), - node.id(), - futVer, + nodeId, + futId, writeVer, syncMode, topVer, - false, updateReq.subjectId(), updateReq.taskNameHash(), null, cctx.deploymentEnabled(), updateReq.keepBinary(), - updateReq.skipStore()); + updateReq.skipStore(), + false); } } - /** {@inheritDoc} */ - @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) { - if (log.isDebugEnabled()) - log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']'); - - if (updateRes.error() != null) - this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error()); - - if (!F.isEmpty(updateRes.nearEvicted())) { - try { - assert nearReaderEntry != null; - - nearReaderEntry.removeReader(nodeId, updateRes.messageId()); - } - catch (GridCacheEntryRemovedException e) { - if (log.isDebugEnabled()) - log.debug("Entry with evicted reader was removed [entry=" + nearReaderEntry + ", err=" + e + ']'); - } - } - - registerResponse(nodeId); - } - - /** {@inheritDoc} */ - @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) { - updateRes.addFailedKey(key, err); - } - /** - * @param node Target node * @param ttl TTL. * @param conflictExpireTime Conflict expire time. * @param conflictVer Conflict version. * @return {@code True} if it is possible to use {@link GridDhtAtomicSingleUpdateRequest}. */ - private boolean canUseSingleRequest(ClusterNode node, - long ttl, + private boolean canUseSingleRequest(long ttl, long conflictExpireTime, @Nullable GridCacheVersion conflictVer) { - return node.version().compareToIgnoreTimestamp(SINGLE_UPDATE_REQUEST) >= 0 && - (ttl == CU.TTL_NOT_CHANGED) && + return (ttl == CU.TTL_NOT_CHANGED) && (conflictExpireTime == CU.EXPIRE_TIME_CALCULATE) && conflictVer == null; } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridDhtAtomicSingleUpdateFuture.class, this); + return S.toString(GridDhtAtomicSingleUpdateFuture.class, this, "super", super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java index a7e6c24..6b92c02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java @@ -38,9 +38,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.KEEP_BINARY_FLAG_MASK; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK; - /** * */ @@ -48,30 +45,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** */ private static final long serialVersionUID = 0L; - /** Near cache key flag. */ - private static final int NEAR_FLAG_MASK = 0x80; - - /** Future version. */ - protected GridCacheVersion futVer; - - /** Write version. */ - protected GridCacheVersion writeVer; - - /** Write synchronization mode. */ - protected CacheWriteSynchronizationMode syncMode; - - /** Topology version. */ - protected AffinityTopologyVersion topVer; - - /** Subject ID. */ - protected UUID subjId; - - /** Task name hash. */ - protected int taskNameHash; - - /** Additional flags. */ - protected byte flags; - /** Key to update. */ @GridToStringInclude protected KeyCacheObject key; @@ -87,9 +60,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** Partition. */ protected long updateCntr; - /** */ - protected int partId; - /** * Empty constructor required by {@link Externalizable}. */ @@ -102,7 +72,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat * * @param cacheId Cache ID. * @param nodeId Node ID. - * @param futVer Future version. + * @param futId Future ID. * @param writeVer Write version for cache values. * @param syncMode Cache write synchronization mode. * @param topVer Topology version. @@ -115,7 +85,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat GridDhtAtomicSingleUpdateRequest( int cacheId, UUID nodeId, - GridCacheVersion futVer, + long futId, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @NotNull AffinityTopologyVersion topVer, @@ -125,19 +95,17 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat boolean keepBinary, boolean skipStore ) { - super(cacheId, nodeId); - this.futVer = futVer; - this.writeVer = writeVer; - this.syncMode = syncMode; - this.topVer = topVer; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.addDepInfo = addDepInfo; - - if (skipStore) - setFlag(true, SKIP_STORE_FLAG_MASK); - if (keepBinary) - setFlag(true, KEEP_BINARY_FLAG_MASK); + super(cacheId, + nodeId, + futId, + writeVer, + syncMode, + topVer, + subjId, + taskNameHash, + addDepInfo, + keepBinary, + skipStore); } /** @@ -148,7 +116,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). * @param addPrevVal If {@code true} adds previous value. - * @param partId Partition. * @param prevVal Previous value. * @param updateCntr Update counter. */ @@ -159,7 +126,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat long conflictExpireTime, @Nullable GridCacheVersion conflictVer, boolean addPrevVal, - int partId, @Nullable CacheObject prevVal, long updateCntr ) { @@ -167,11 +133,11 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat assert ttl <= 0 : ttl; assert conflictExpireTime <= 0 : conflictExpireTime; assert conflictVer == null : conflictVer; + assert key.partition() >= 0 : key; near(false); this.key = key; - this.partId = partId; this.val = val; if (addPrevVal) @@ -194,6 +160,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat long expireTime) { assert entryProcessor == null; assert ttl <= 0 : ttl; + assert key.partition() >= 0 : key; near(true); @@ -222,11 +189,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat } /** {@inheritDoc} */ - @Override public boolean skipStore() { - return isFlag(SKIP_STORE_FLAG_MASK); - } - - /** {@inheritDoc} */ @Override public KeyCacheObject key(int idx) { assert idx == 0 : idx; @@ -235,14 +197,11 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** {@inheritDoc} */ @Override public int partition() { - return partId; - } + int p = key.partition(); - /** {@inheritDoc} */ - @Override public int partitionId(int idx) { - assert idx == 0 : idx; + assert p >= 0; - return partId; + return p; } /** {@inheritDoc} */ @@ -267,31 +226,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat } /** {@inheritDoc} */ - @Override public GridCacheVersion futureVersion() { - return futVer; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion writeVersion() { - return writeVer; - } - - /** {@inheritDoc} */ - @Override public int taskNameHash() { - return taskNameHash; - } - - /** {@inheritDoc} */ - @Override public UUID subjectId() { - return subjId; - } - - /** {@inheritDoc} */ - @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { - return syncMode; - } - - /** {@inheritDoc} */ @Override @Nullable public CacheObject previousValue(int idx) { assert idx == 0 : idx; @@ -360,25 +294,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat } /** {@inheritDoc} */ - @Override public boolean keepBinary() { - return isFlag(KEEP_BINARY_FLAG_MASK); - } - - /** - * - */ - private boolean near() { - return isFlag(NEAR_FLAG_MASK); - } - - /** - * - */ - private void near(boolean near) { - setFlag(near, NEAR_FLAG_MASK); - } - - /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); @@ -403,8 +318,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat finishUnmarshalObject(val, cctx, ldr); finishUnmarshalObject(prevVal, cctx, ldr); - - key.partition(partId); } /** {@inheritDoc} */ @@ -422,78 +335,30 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat } switch (writer.state()) { - case 3: - if (!writer.writeByte("flags", flags)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeMessage("futVer", futVer)) - return false; - - writer.incrementState(); - - case 5: + case 12: if (!writer.writeMessage("key", key)) return false; writer.incrementState(); - case 6: - if (!writer.writeInt("partId", partId)) - return false; - - writer.incrementState(); - - case 7: + case 13: if (!writer.writeMessage("prevVal", prevVal)) return false; writer.incrementState(); - case 8: - if (!writer.writeUuid("subjId", subjId)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeInt("taskNameHash", taskNameHash)) - return false; - - writer.incrementState(); - - case 11: - if (!writer.writeMessage("topVer", topVer)) - return false; - - writer.incrementState(); - - case 12: + case 14: if (!writer.writeLong("updateCntr", updateCntr)) return false; writer.incrementState(); - case 13: + case 15: if (!writer.writeMessage("val", val)) return false; writer.incrementState(); - case 14: - if (!writer.writeMessage("writeVer", writeVer)) - return false; - - writer.incrementState(); - } return true; @@ -510,23 +375,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat return false; switch (reader.state()) { - case 3: - flags = reader.readByte("flags"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - futVer = reader.readMessage("futVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: + case 12: key = reader.readMessage("key"); if (!reader.isLastRead()) @@ -534,15 +383,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 6: - partId = reader.readInt("partId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: + case 13: prevVal = reader.readMessage("prevVal"); if (!reader.isLastRead()) @@ -550,43 +391,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 8: - subjId = reader.readUuid("subjId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - byte syncModeOrd; - - syncModeOrd = reader.readByte("syncMode"); - - if (!reader.isLastRead()) - return false; - - syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); - - reader.incrementState(); - - case 10: - taskNameHash = reader.readInt("taskNameHash"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: - topVer = reader.readMessage("topVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: + case 14: updateCntr = reader.readLong("updateCntr"); if (!reader.isLastRead()) @@ -594,7 +399,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 13: + case 15: val = reader.readMessage("val"); if (!reader.isLastRead()) @@ -602,14 +407,6 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat reader.incrementState(); - case 14: - writeVer = reader.readMessage("writeVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } return reader.afterMessageRead(GridDhtAtomicSingleUpdateRequest.class); @@ -652,27 +449,7 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 15; - } - - /** - * Sets flag mask. - * - * @param flag Set or clear. - * @param mask Mask. - */ - private void setFlag(boolean flag, int mask) { - flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); - } - - /** - * Reags flag mask. - * - * @param mask Mask to read. - * @return Flag value. - */ - private boolean isFlag(int mask) { - return (flags & mask) != 0; + return 16; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 5429adc..5d5ddf0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -17,22 +17,15 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.typedef.CI2; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.NotNull; @@ -45,89 +38,45 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { /** */ private static final long serialVersionUID = 0L; - /** Future keys. */ - private final Collection keys; - - /** Entries with readers. */ - private Map nearReadersEntries; - + /** */ + private int updateCntr; /** * @param cctx Cache context. - * @param completionCb Callback to invoke when future is completed. * @param writeVer Write version. * @param updateReq Update request. - * @param updateRes Update response. */ GridDhtAtomicUpdateFuture( GridCacheContext cctx, - CI2 completionCb, GridCacheVersion writeVer, - GridNearAtomicAbstractUpdateRequest updateReq, - GridNearAtomicUpdateResponse updateRes + GridNearAtomicAbstractUpdateRequest updateReq ) { - super(cctx, - completionCb, - writeVer, - updateReq, - updateRes); + super(cctx, writeVer, updateReq); - keys = new ArrayList<>(updateReq.size()); mappings = U.newHashMap(updateReq.size()); } /** {@inheritDoc} */ - @Override protected void addDhtKey(KeyCacheObject key, List dhtNodes) { - keys.add(key); - } - - /** {@inheritDoc} */ - @Override protected void addNearKey(KeyCacheObject key, Collection readers) { - keys.add(key); - } - - /** {@inheritDoc} */ - @Override protected void addNearReaderEntry(GridDhtCacheEntry entry) { - if (nearReadersEntries == null) - nearReadersEntries = new HashMap<>(); - - nearReadersEntries.put(entry.key(), entry); + @Override protected boolean sendAllToDht() { + return updateCntr == updateReq.size(); } /** {@inheritDoc} */ - @Override public void onResult(UUID nodeId, GridDhtAtomicUpdateResponse updateRes) { - if (log.isDebugEnabled()) - log.debug("Received DHT atomic update future result [nodeId=" + nodeId + ", updateRes=" + updateRes + ']'); - - if (updateRes.error() != null) - this.updateRes.addFailedKeys(updateRes.failedKeys(), updateRes.error()); - - if (!F.isEmpty(updateRes.nearEvicted())) { - for (KeyCacheObject key : updateRes.nearEvicted()) { - GridDhtCacheEntry entry = nearReadersEntries.get(key); - - try { - entry.removeReader(nodeId, updateRes.messageId()); - } - catch (GridCacheEntryRemovedException e) { - if (log.isDebugEnabled()) - log.debug("Entry with evicted reader was removed [entry=" + entry + ", err=" + e + ']'); - } - } - } + @Override protected void addDhtKey(KeyCacheObject key, List dhtNodes) { + assert updateCntr < updateReq.size(); - registerResponse(nodeId); + updateCntr++; } /** {@inheritDoc} */ - @Override protected void addFailedKeys(GridNearAtomicUpdateResponse updateRes, Throwable err) { - for (KeyCacheObject key : keys) - updateRes.addFailedKey(key, err); + @Override protected void addNearKey(KeyCacheObject key, Collection readers) { + // No-op. } /** {@inheritDoc} */ - @Override protected GridDhtAtomicAbstractUpdateRequest createRequest(ClusterNode node, - GridCacheVersion futVer, + @Override protected GridDhtAtomicAbstractUpdateRequest createRequest( + UUID nodeId, + long futId, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @NotNull AffinityTopologyVersion topVer, @@ -137,22 +86,22 @@ class GridDhtAtomicUpdateFuture extends GridDhtAtomicAbstractUpdateFuture { ) { return new GridDhtAtomicUpdateRequest( cctx.cacheId(), - node.id(), - futVer, + nodeId, + futId, writeVer, syncMode, topVer, - false, updateReq.subjectId(), updateReq.taskNameHash(), null, cctx.deploymentEnabled(), updateReq.keepBinary(), - updateReq.skipStore()); + updateReq.skipStore(), + false); } /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridDhtAtomicUpdateFuture.class, this); + return S.toString(GridDhtAtomicUpdateFuture.class, this, "super", super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 7144963..6b8af8d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -44,8 +44,6 @@ import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK; - /** * Lite dht cache backup update request. */ @@ -53,15 +51,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque /** */ private static final long serialVersionUID = 0L; - /** Future version. */ - private GridCacheVersion futVer; - - /** Write version. */ - private GridCacheVersion writeVer; - - /** Topology version. */ - private AffinityTopologyVersion topVer; - /** Keys to update. */ @GridToStringInclude @GridDirectCollection(KeyCacheObject.class) @@ -93,9 +82,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque /** Near expire times. */ private GridLongList nearExpireTimes; - /** Write synchronization mode. */ - private CacheWriteSynchronizationMode syncMode; - /** Near cache keys to update. */ @GridToStringInclude @GridDirectCollection(KeyCacheObject.class) @@ -132,25 +118,9 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque /** Entry processor arguments bytes. */ private byte[][] invokeArgsBytes; - /** Subject ID. */ - private UUID subjId; - - /** Task name hash. */ - private int taskNameHash; - /** Partition. */ private GridLongList updateCntrs; - /** */ - @GridDirectTransient - private List partIds; - - /** Keep binary flag. */ - private boolean keepBinary; - - /** Additional flags. */ - private byte flags; - /** * Empty constructor required by {@link Externalizable}. */ @@ -163,7 +133,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque * * @param cacheId Cache ID. * @param nodeId Node ID. - * @param futVer Future version. + * @param futId Future ID. * @param writeVer Write version for cache values. * @param invokeArgs Optional arguments for entry processor. * @param syncMode Cache write synchronization mode. @@ -176,38 +146,36 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque public GridDhtAtomicUpdateRequest( int cacheId, UUID nodeId, - GridCacheVersion futVer, + long futId, GridCacheVersion writeVer, CacheWriteSynchronizationMode syncMode, @NotNull AffinityTopologyVersion topVer, - boolean forceTransformBackups, UUID subjId, int taskNameHash, Object[] invokeArgs, boolean addDepInfo, boolean keepBinary, - boolean skipStore + boolean skipStore, + boolean forceTransformBackups ) { - super(cacheId, nodeId); - - this.futVer = futVer; - this.writeVer = writeVer; - this.syncMode = syncMode; - this.topVer = topVer; - this.forceTransformBackups = forceTransformBackups; - this.subjId = subjId; - this.taskNameHash = taskNameHash; + super(cacheId, + nodeId, + futId, + writeVer, + syncMode, + topVer, + subjId, + taskNameHash, + addDepInfo, + keepBinary, + skipStore); assert invokeArgs == null || forceTransformBackups; + this.forceTransformBackups = forceTransformBackups; this.invokeArgs = invokeArgs; - this.addDepInfo = addDepInfo; - this.keepBinary = keepBinary; - - setFlag(skipStore, SKIP_STORE_FLAG_MASK); keys = new ArrayList<>(); - partIds = new ArrayList<>(); if (forceTransformBackups) { entryProcessors = new ArrayList<>(); @@ -225,13 +193,12 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque long conflictExpireTime, @Nullable GridCacheVersion conflictVer, boolean addPrevVal, - int partId, @Nullable CacheObject prevVal, long updateCntr ) { - keys.add(key); + assert key.partition() >= 0 : key; - partIds.add(partId); + keys.add(key); if (forceTransformBackups) { assert entryProcessor != null; @@ -298,6 +265,8 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque EntryProcessor entryProcessor, long ttl, long expireTime) { + assert key.partition() >= 0 : key; + if (nearKeys == null) { nearKeys = new ArrayList<>(); @@ -350,31 +319,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque } /** {@inheritDoc} */ - @Override public UUID subjectId() { - return subjId; - } - - /** {@inheritDoc} */ - @Override public int taskNameHash() { - return taskNameHash; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion futureVersion() { - return futVer; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion writeVersion() { - return writeVer; - } - - /** {@inheritDoc} */ - @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { - return syncMode; - } - - /** {@inheritDoc} */ @Override public AffinityTopologyVersion topologyVersion() { return topVer; } @@ -400,11 +344,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque } /** {@inheritDoc} */ - @Override public int partitionId(int idx) { - return partIds.get(idx); - } - - /** {@inheritDoc} */ @Override public Long updateCounter(int updCntr) { if (updateCntrs != null && updCntr < updateCntrs.size()) return updateCntrs.get(updCntr); @@ -486,7 +425,13 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque /** {@inheritDoc} */ @Override public int partition() { - return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1; + assert !F.isEmpty(keys) || !F.isEmpty(nearKeys); + + int p = keys.size() > 0 ? keys.get(0).partition() : nearKeys.get(0).partition(); + + assert p >= 0; + + return p; } /** {@inheritDoc} */ @@ -512,16 +457,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque } /** {@inheritDoc} */ - @Override public boolean keepBinary() { - return keepBinary; - } - - /** {@inheritDoc} */ - @Override public boolean skipStore() { - return isFlag(SKIP_STORE_FLAG_MASK); - } - - /** {@inheritDoc} */ @Override @Nullable public Object[] invokeArguments() { return invokeArgs; } @@ -584,13 +519,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque if (nearEntryProcessors == null) nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr); } - - if (partIds != null && !partIds.isEmpty()) { - assert partIds.size() == keys.size(); - - for (int i = 0; i < keys.size(); i++) - keys.get(i).partition(partIds.get(i)); - } } /** {@inheritDoc} */ @@ -608,144 +536,96 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque } switch (writer.state()) { - case 3: + case 12: if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) return false; writer.incrementState(); - case 4: + case 13: if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 5: + case 14: if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); - case 6: - if (!writer.writeByte("flags", flags)) - return false; - - writer.incrementState(); - - case 7: + case 15: if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups)) return false; writer.incrementState(); - case 8: - if (!writer.writeMessage("futVer", futVer)) - return false; - - writer.incrementState(); - - case 9: + case 16: if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); - case 10: - if (!writer.writeBoolean("keepBinary", keepBinary)) - return false; - - writer.incrementState(); - - case 11: + case 17: if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 12: + case 18: if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) return false; writer.incrementState(); - case 13: + case 19: if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) return false; writer.incrementState(); - case 14: + case 20: if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 15: + case 21: if (!writer.writeMessage("nearTtls", nearTtls)) return false; writer.incrementState(); - case 16: + case 22: if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 17: + case 23: if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 18: - if (!writer.writeUuid("subjId", subjId)) - return false; - - writer.incrementState(); - - case 19: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 20: - if (!writer.writeInt("taskNameHash", taskNameHash)) - return false; - - writer.incrementState(); - - case 21: - if (!writer.writeMessage("topVer", topVer)) - return false; - - writer.incrementState(); - - case 22: + case 24: if (!writer.writeMessage("ttls", ttls)) return false; writer.incrementState(); - case 23: + case 25: if (!writer.writeMessage("updateCntrs", updateCntrs)) return false; writer.incrementState(); - case 24: + case 26: if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 25: - if (!writer.writeMessage("writeVer", writeVer)) - return false; - - writer.incrementState(); - } return true; @@ -762,7 +642,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque return false; switch (reader.state()) { - case 3: + case 12: conflictExpireTimes = reader.readMessage("conflictExpireTimes"); if (!reader.isLastRead()) @@ -770,7 +650,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 4: + case 13: conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -778,7 +658,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 5: + case 14: entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) @@ -786,15 +666,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 6: - flags = reader.readByte("flags"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: + case 15: forceTransformBackups = reader.readBoolean("forceTransformBackups"); if (!reader.isLastRead()) @@ -802,15 +674,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 8: - futVer = reader.readMessage("futVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: + case 16: invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); if (!reader.isLastRead()) @@ -818,15 +682,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 10: - keepBinary = reader.readBoolean("keepBinary"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: + case 17: keys = reader.readCollection("keys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -834,7 +690,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 12: + case 18: nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); if (!reader.isLastRead()) @@ -842,7 +698,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 13: + case 19: nearExpireTimes = reader.readMessage("nearExpireTimes"); if (!reader.isLastRead()) @@ -850,7 +706,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 14: + case 20: nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -858,7 +714,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 15: + case 21: nearTtls = reader.readMessage("nearTtls"); if (!reader.isLastRead()) @@ -866,7 +722,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 16: + case 22: nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -874,7 +730,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 17: + case 23: prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -882,43 +738,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 18: - subjId = reader.readUuid("subjId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 19: - byte syncModeOrd; - - syncModeOrd = reader.readByte("syncMode"); - - if (!reader.isLastRead()) - return false; - - syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); - - reader.incrementState(); - - case 20: - taskNameHash = reader.readInt("taskNameHash"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 21: - topVer = reader.readMessage("topVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 22: + case 24: ttls = reader.readMessage("ttls"); if (!reader.isLastRead()) @@ -926,7 +746,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 23: + case 25: updateCntrs = reader.readMessage("updateCntrs"); if (!reader.isLastRead()) @@ -934,7 +754,7 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 24: + case 26: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -942,14 +762,6 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque reader.incrementState(); - case 25: - writeVer = reader.readMessage("writeVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class); @@ -968,30 +780,9 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 26; - } - - /** - * Sets flag mask. - * - * @param flag Set or clear. - * @param mask Mask. - */ - private void setFlag(boolean flag, int mask) { - flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); + return 27; } - /** - * Reags flag mask. - * - * @param mask Mask to read. - * @return Flag value. - */ - private boolean isFlag(int mask) { - return (flags & mask) != 0; - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", super.toString()); http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index c3d3ca9..ab7aa6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -25,16 +25,13 @@ import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -50,19 +47,10 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri public static final int CACHE_MSG_IDX = nextIndexId(); /** Future version. */ - private GridCacheVersion futVer; + private long futId; - /** Failed keys. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private List failedKeys; - - /** Update error. */ - @GridDirectTransient - private IgniteCheckedException err; - - /** Serialized update error. */ - private byte[] errBytes; + /** */ + private UpdateErrors errs; /** Evicted readers. */ @GridToStringInclude @@ -70,7 +58,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri private List nearEvicted; /** */ - private int partId = -1; + private int partId; /** * Empty constructor required by {@link Externalizable}. @@ -81,12 +69,14 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri /** * @param cacheId Cache ID. - * @param futVer Future version. + * @param partId Partition. + * @param futId Future ID. * @param addDepInfo Deployment info. */ - public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer, boolean addDepInfo) { + public GridDhtAtomicUpdateResponse(int cacheId, int partId, long futId, boolean addDepInfo) { this.cacheId = cacheId; - this.futVer = futVer; + this.partId = partId; + this.futId = futId; this.addDepInfo = addDepInfo; } @@ -98,8 +88,8 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri /** * @return Future version. */ - public GridCacheVersion futureVersion() { - return futVer; + public long futureId() { + return futId; } /** @@ -108,63 +98,29 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri * @param err Error. */ public void onError(IgniteCheckedException err){ - this.err = err; + if (errs == null) + errs = new UpdateErrors(); + + errs.onError(err); } /** {@inheritDoc} */ @Override public IgniteCheckedException error() { - return err; - } - - /** - * @return Failed keys. - */ - public Collection failedKeys() { - return failedKeys; - } - - /** - * Adds key to collection of failed keys. - * - * @param key Key to add. - * @param e Error cause. - */ - public void addFailedKey(KeyCacheObject key, Throwable e) { - if (failedKeys == null) - failedKeys = new ArrayList<>(); - - failedKeys.add(key); - - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); - - err.addSuppressed(e); + return errs != null ? errs.error() : null; } /** * @return Evicted readers. */ - public Collection nearEvicted() { + Collection nearEvicted() { return nearEvicted; } /** - * Adds near evicted key.. - * - * @param key Evicted key. - */ - public void addNearEvicted(KeyCacheObject key) { - if (nearEvicted == null) - nearEvicted = new ArrayList<>(); - - nearEvicted.add(key); - } - - /** - * @param partId Partition ID to set. + * @param nearEvicted Evicted near cache keys. */ - public void partition(int partId) { - this.partId = partId; + void nearEvicted(List nearEvicted) { + this.nearEvicted = nearEvicted; } /** {@inheritDoc} */ @@ -178,12 +134,10 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri GridCacheContext cctx = ctx.cacheContext(cacheId); - prepareMarshalCacheObjects(failedKeys, cctx); - prepareMarshalCacheObjects(nearEvicted, cctx); - if (errBytes == null) - errBytes = U.marshal(ctx, err); + if (errs != null) + errs.prepareMarshal(this, cctx); } /** {@inheritDoc} */ @@ -192,12 +146,10 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri GridCacheContext cctx = ctx.cacheContext(cacheId); - finishUnmarshalCacheObjects(failedKeys, cctx, ldr); - finishUnmarshalCacheObjects(nearEvicted, cctx, ldr); - if (errBytes != null && err == null) - err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + if (errs != null) + errs.finishUnmarshal(this, cctx, ldr); } /** {@inheritDoc} */ @@ -226,30 +178,24 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri switch (writer.state()) { case 3: - if (!writer.writeByteArray("errBytes", errBytes)) + if (!writer.writeMessage("errs", errs)) return false; writer.incrementState(); case 4: - if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG)) + if (!writer.writeLong("futId", futId)) return false; writer.incrementState(); case 5: - if (!writer.writeMessage("futVer", futVer)) - return false; - - writer.incrementState(); - - case 6: if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 7: + case 6: if (!writer.writeInt("partId", partId)) return false; @@ -272,7 +218,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri switch (reader.state()) { case 3: - errBytes = reader.readByteArray("errBytes"); + errs = reader.readMessage("errs"); if (!reader.isLastRead()) return false; @@ -280,7 +226,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri reader.incrementState(); case 4: - failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG); + futId = reader.readLong("futId"); if (!reader.isLastRead()) return false; @@ -288,14 +234,6 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri reader.incrementState(); case 5: - futVer = reader.readMessage("futVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -303,7 +241,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri reader.incrementState(); - case 7: + case 6: partId = reader.readInt("partId"); if (!reader.isLastRead()) @@ -323,7 +261,7 @@ public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements Gri /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 8; + return 7; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/cbc472fe/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java index 61deeee..6811236 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractSingleUpdateRequest.java @@ -18,19 +18,13 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.io.Externalizable; -import java.nio.ByteBuffer; import java.util.UUID; import javax.cache.expiry.ExpiryPolicy; -import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -44,59 +38,6 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear /** */ private static final CacheEntryPredicate[] NO_FILTER = new CacheEntryPredicate[0]; - /** Fast map flag mask. */ - private static final int FAST_MAP_FLAG_MASK = 0x1; - - /** Flag indicating whether request contains primary keys. */ - private static final int HAS_PRIMARY_FLAG_MASK = 0x2; - - /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ - private static final int TOP_LOCKED_FLAG_MASK = 0x4; - - /** Skip write-through to a persistent storage. */ - private static final int SKIP_STORE_FLAG_MASK = 0x8; - - /** */ - private static final int CLIENT_REQ_FLAG_MASK = 0x10; - - /** Keep binary flag. */ - private static final int KEEP_BINARY_FLAG_MASK = 0x20; - - /** Return value flag. */ - private static final int RET_VAL_FLAG_MASK = 0x40; - - /** Target node ID. */ - @GridDirectTransient - protected UUID nodeId; - - /** Future version. */ - protected GridCacheVersion futVer; - - /** Update version. Set to non-null if fastMap is {@code true}. */ - private GridCacheVersion updateVer; - - /** Topology version. */ - protected AffinityTopologyVersion topVer; - - /** Write synchronization mode. */ - protected CacheWriteSynchronizationMode syncMode; - - /** Update operation. */ - protected GridCacheOperation op; - - /** Subject ID. */ - protected UUID subjId; - - /** Task name hash. */ - protected int taskNameHash; - - /** */ - @GridDirectTransient - private GridNearAtomicUpdateResponse res; - - /** Compressed boolean flags. */ - protected byte flags; - /** * Empty constructor required by {@link Externalizable}. */ @@ -109,9 +50,7 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear * * @param cacheId Cache ID. * @param nodeId Node ID. - * @param futVer Future version. - * @param fastMap Fast map scheme flag. - * @param updateVer Update version set if fast map is performed. + * @param futId Future ID. * @param topVer Topology version. * @param topLocked Topology locked flag. * @param syncMode Synchronization mode. @@ -121,15 +60,12 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear * @param taskNameHash Task name hash code. * @param skipStore Skip write-through to a persistent storage. * @param keepBinary Keep binary flag. - * @param clientReq Client node request flag. * @param addDepInfo Deployment info flag. */ protected GridNearAtomicAbstractSingleUpdateRequest( int cacheId, UUID nodeId, - GridCacheVersion futVer, - boolean fastMap, - @Nullable GridCacheVersion updateVer, + long futId, @NotNull AffinityTopologyVersion topVer, boolean topLocked, CacheWriteSynchronizationMode syncMode, @@ -137,91 +73,25 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear boolean retval, @Nullable UUID subjId, int taskNameHash, + boolean mappingKnown, boolean skipStore, boolean keepBinary, - boolean clientReq, boolean addDepInfo ) { - assert futVer != null; - - this.cacheId = cacheId; - this.nodeId = nodeId; - this.futVer = futVer; - this.updateVer = updateVer; - this.topVer = topVer; - this.syncMode = syncMode; - this.op = op; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.addDepInfo = addDepInfo; - - fastMap(fastMap); - topologyLocked(topLocked); - returnValue(retval); - skipStore(skipStore); - keepBinary(keepBinary); - clientRequest(clientReq); - } - - /** {@inheritDoc} */ - @Override public int lookupIndex() { - return CACHE_MSG_IDX; - } - - /** - * @return Mapped node ID. - */ - @Override public UUID nodeId() { - return nodeId; - } - - /** - * @param nodeId Node ID. - */ - @Override public void nodeId(UUID nodeId) { - this.nodeId = nodeId; - } - - /** - * @return Subject ID. - */ - @Override public UUID subjectId() { - return subjId; - } - - /** - * @return Task name hash. - */ - @Override public int taskNameHash() { - return taskNameHash; - } - - /** - * @return Future version. - */ - @Override public GridCacheVersion futureVersion() { - return futVer; - } - - /** - * @return Update version for fast-map request. - */ - @Override public GridCacheVersion updateVersion() { - return updateVer; - } - - /** - * @return Topology version. - */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** - * @return Cache write synchronization mode. - */ - @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { - return syncMode; + super(cacheId, + nodeId, + futId, + topVer, + topLocked, + syncMode, + op, + retval, + subjId, + taskNameHash, + mappingKnown, + skipStore, + keepBinary, + addDepInfo); } /** @@ -232,331 +102,14 @@ public abstract class GridNearAtomicAbstractSingleUpdateRequest extends GridNear } /** - * @return Update operation. - */ - @Override public GridCacheOperation operation() { - return op; - } - - /** * @return Optional arguments for entry processor. */ @Override @Nullable public Object[] invokeArguments() { return null; } - /** - * @param res Response. - * @return {@code True} if current response was {@code null}. - */ - @Override public boolean onResponse(GridNearAtomicUpdateResponse res) { - if (this.res == null) { - this.res = res; - - return true; - } - - return false; - } - - /** - * @return Response. - */ - @Override @Nullable public GridNearAtomicUpdateResponse response() { - return res; - } - - /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return addDepInfo; - } - - /** {@inheritDoc} */ - @Override public IgniteLogger messageLogger(GridCacheSharedContext ctx) { - return ctx.atomicMessageLogger(); - } - - /** - * @return Flag indicating whether this is fast-map udpate. - */ - @Override public boolean fastMap() { - return isFlag(FAST_MAP_FLAG_MASK); - } - - /** - * Sets fastMap flag value. - */ - public void fastMap(boolean val) { - setFlag(val, FAST_MAP_FLAG_MASK); - } - - /** - * @return Topology locked flag. - */ - @Override public boolean topologyLocked() { - return isFlag(TOP_LOCKED_FLAG_MASK); - } - - /** - * Sets topologyLocked flag value. - */ - public void topologyLocked(boolean val) { - setFlag(val, TOP_LOCKED_FLAG_MASK); - } - - /** - * @return {@code True} if request sent from client node. - */ - @Override public boolean clientRequest() { - return isFlag(CLIENT_REQ_FLAG_MASK); - } - - /** - * Sets clientRequest flag value. - */ - public void clientRequest(boolean val) { - setFlag(val, CLIENT_REQ_FLAG_MASK); - } - - /** - * @return Return value flag. - */ - @Override public boolean returnValue() { - return isFlag(RET_VAL_FLAG_MASK); - } - - /** - * Sets returnValue flag value. - */ - public void returnValue(boolean val) { - setFlag(val, RET_VAL_FLAG_MASK); - } - - /** - * @return Skip write-through to a persistent storage. - */ - @Override public boolean skipStore() { - return isFlag(SKIP_STORE_FLAG_MASK); - } - - /** - * Sets skipStore flag value. - */ - public void skipStore(boolean val) { - setFlag(val, SKIP_STORE_FLAG_MASK); - } - - /** - * @return Keep binary flag. - */ - @Override public boolean keepBinary() { - return isFlag(KEEP_BINARY_FLAG_MASK); - } - - /** - * Sets keepBinary flag value. - */ - public void keepBinary(boolean val) { - setFlag(val, KEEP_BINARY_FLAG_MASK); - } - - /** - * @return Flag indicating whether this request contains primary keys. - */ - @Override public boolean hasPrimary() { - return isFlag(HAS_PRIMARY_FLAG_MASK); - } - - /** - * Sets hasPrimary flag value. - */ - public void hasPrimary(boolean val) { - setFlag(val, HAS_PRIMARY_FLAG_MASK); - } - /** {@inheritDoc} */ @Nullable @Override public CacheEntryPredicate[] filter() { return NO_FILTER; } - - /** - * Sets flag mask. - * - * @param flag Set or clear. - * @param mask Mask. - */ - private void setFlag(boolean flag, int mask) { - flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask); - } - - /** - * Reads flag mask. - * - * @param mask Mask to read. - * @return Flag value. - */ - private boolean isFlag(int mask) { - return (flags & mask) != 0; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 3: - if (!writer.writeByte("flags", flags)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeMessage("futVer", futVer)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeUuid("subjId", subjId)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeInt("taskNameHash", taskNameHash)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeMessage("topVer", topVer)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeMessage("updateVer", updateVer)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 3: - flags = reader.readByte("flags"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - futVer = reader.readMessage("futVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - byte opOrd; - - opOrd = reader.readByte("op"); - - if (!reader.isLastRead()) - return false; - - op = GridCacheOperation.fromOrdinal(opOrd); - - reader.incrementState(); - - case 6: - subjId = reader.readUuid("subjId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - byte syncModeOrd; - - syncModeOrd = reader.readByte("syncMode"); - - if (!reader.isLastRead()) - return false; - - syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); - - reader.incrementState(); - - case 8: - taskNameHash = reader.readInt("taskNameHash"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - topVer = reader.readMessage("topVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - updateVer = reader.readMessage("updateVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(GridNearAtomicAbstractSingleUpdateRequest.class); - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 11; - } }