Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A6E4A18996 for ; Thu, 25 Feb 2016 12:31:05 +0000 (UTC) Received: (qmail 168 invoked by uid 500); 25 Feb 2016 12:31:02 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 99859 invoked by uid 500); 25 Feb 2016 12:31:02 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 98174 invoked by uid 99); 25 Feb 2016 12:31:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Feb 2016 12:31:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EFE9DE8F03; Thu, 25 Feb 2016 12:31:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 25 Feb 2016 12:31:26 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [27/51] [abbrv] ignite git commit: ignite-2523 : Created GridDhtAtomicSingleUpdateRequest optimized implementation. http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateResponse.java new file mode 100644 index 0000000..6a69ccc --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateResponse.java @@ -0,0 +1,296 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheDeployable; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import java.io.Externalizable; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +public class GridDhtAtomicSingleUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridDhtAtomicUpdateResponse { + /** */ + private static final long serialVersionUID = 0L; + + /** Message index. */ + public static final int CACHE_MSG_IDX = nextIndexId(); + + /** Future version. */ + private GridCacheVersion futVer; + + /** Failed keys. */ + @GridToStringInclude + @GridDirectCollection(KeyCacheObject.class) + private List failedKeys; + + /** Update error. */ + @GridDirectTransient + private IgniteCheckedException err; + + /** Serialized update error. */ + private byte[] errBytes; + + /** Evicted readers. */ + @GridToStringInclude + @GridDirectCollection(KeyCacheObject.class) + private List nearEvicted; + + /** + * Empty constructor required by {@link Externalizable}. + */ + public GridDhtAtomicSingleUpdateResponse() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param futVer Future version. + * @param addDepInfo Deployment info. + */ + public GridDhtAtomicSingleUpdateResponse(int cacheId, GridCacheVersion futVer, boolean addDepInfo) { + this.cacheId = cacheId; + this.futVer = futVer; + this.addDepInfo = addDepInfo; + } + + /** {@inheritDoc} */ + @Override public int lookupIndex() { + return CACHE_MSG_IDX; + } + + /** + * @return Future version. + */ + @Override public GridCacheVersion futureVersion() { + return futVer; + } + + /** + * Sets update error. + * + * @param err Error. + */ + @Override public void onError(IgniteCheckedException err) { + this.err = err; + } + + /** {@inheritDoc} */ + @Override public IgniteCheckedException error() { + return err; + } + + /** + * @return Failed keys. + */ + @Override public Collection failedKeys() { + return failedKeys; + } + + /** + * Adds key to collection of failed keys. + * + * @param key Key to add. + * @param e Error cause. + */ + @Override public void addFailedKey(KeyCacheObject key, Throwable e) { + if (failedKeys == null) + failedKeys = new ArrayList<>(); + + failedKeys.add(key); + + if (err == null) + err = new IgniteCheckedException("Failed to update keys on primary node."); + + err.addSuppressed(e); + } + + /** + * @return Evicted readers. + */ + @Override public Collection nearEvicted() { + return nearEvicted; + } + + /** + * Adds near evicted key.. + * + * @param key Evicted key. + */ + @Override public void addNearEvicted(KeyCacheObject key) { + if (nearEvicted == null) + nearEvicted = new ArrayList<>(); + + nearEvicted.add(key); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + prepareMarshalCacheObjects(failedKeys, cctx); + + prepareMarshalCacheObjects(nearEvicted, cctx); + + if (errBytes == null) + errBytes = ctx.marshaller().marshal(err); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + finishUnmarshalCacheObjects(failedKeys, cctx, ldr); + + finishUnmarshalCacheObjects(nearEvicted, cctx, ldr); + + if (errBytes != null && err == null) + err = ctx.marshaller().unmarshal(errBytes, ldr); + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeMessage("futVer", futVer)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + futVer = reader.readMessage("futVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridDhtAtomicSingleUpdateResponse.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 39; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 7; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridDhtAtomicSingleUpdateResponse.class, this); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 3a31700..e19a11a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -237,19 +238,34 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter implement GridDhtAtomicUpdateRequest updateReq = mappings.get(nodeId); if (updateReq == null) { - updateReq = new GridDhtAtomicUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - writeVer, - syncMode, - topVer, - forceTransformBackups, - this.updateReq.subjectId(), - this.updateReq.taskNameHash(), - forceTransformBackups ? this.updateReq.invokeArguments() : null, - cctx.deploymentEnabled(), - this.updateReq.keepBinary()); + if (this.updateReq instanceof GridNearAtomicSingleUpdateRequest) + updateReq = new GridDhtAtomicSingleUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + writeVer, + syncMode, + topVer, + forceTransformBackups, + this.updateReq.subjectId(), + this.updateReq.taskNameHash(), + forceTransformBackups ? this.updateReq.invokeArguments() : null, + cctx.deploymentEnabled(), + this.updateReq.keepBinary()); + else + updateReq = new GridDhtAtomicMultipleUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + writeVer, + syncMode, + topVer, + forceTransformBackups, + this.updateReq.subjectId(), + this.updateReq.taskNameHash(), + forceTransformBackups ? this.updateReq.invokeArguments() : null, + cctx.deploymentEnabled(), + this.updateReq.keepBinary()); mappings.put(nodeId, updateReq); } @@ -309,19 +325,34 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter implement if (node == null) continue; - updateReq = new GridDhtAtomicUpdateRequest( - cctx.cacheId(), - nodeId, - futVer, - writeVer, - syncMode, - topVer, - forceTransformBackups, - this.updateReq.subjectId(), - this.updateReq.taskNameHash(), - forceTransformBackups ? this.updateReq.invokeArguments() : null, - cctx.deploymentEnabled(), - this.updateReq.keepBinary()); + if (this.updateReq instanceof GridNearAtomicSingleUpdateRequest) + updateReq = new GridDhtAtomicSingleUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + writeVer, + syncMode, + topVer, + forceTransformBackups, + this.updateReq.subjectId(), + this.updateReq.taskNameHash(), + forceTransformBackups ? this.updateReq.invokeArguments() : null, + cctx.deploymentEnabled(), + this.updateReq.keepBinary()); + else + updateReq = new GridDhtAtomicMultipleUpdateRequest( + cctx.cacheId(), + nodeId, + futVer, + writeVer, + syncMode, + topVer, + forceTransformBackups, + this.updateReq.subjectId(), + this.updateReq.taskNameHash(), + forceTransformBackups ? this.updateReq.invokeArguments() : null, + cctx.deploymentEnabled(), + this.updateReq.keepBinary()); mappings.put(nodeId, updateReq); } @@ -348,7 +379,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter implement if (!mappings.isEmpty()) { Collection hndKeys = new ArrayList<>(keys.size()); - exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { + exit: + for (GridDhtAtomicUpdateRequest req : mappings.values()) { for (int i = 0; i < req.size(); i++) { KeyCacheObject key = req.key(i); @@ -416,7 +448,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter implement if (log.isDebugEnabled()) log.debug("Sending DHT atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + cctx.io().send(req.nodeId(), (GridCacheMessage)req, cctx.ioPolicy()); } catch (ClusterTopologyCheckedException ignored) { U.warn(log, "Failed to send update request to backup node because it left grid: " + http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 7cc276f..0ab67a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -1,248 +1,44 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. */ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.io.Externalizable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; -import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridLongList; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import javax.cache.processor.EntryProcessor; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.UUID; -/** - * Lite dht cache backup update request. - */ -public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable { - /** */ - private static final long serialVersionUID = 0L; - - /** Message index. */ - public static final int CACHE_MSG_IDX = nextIndexId(); - - /** Node ID. */ - private UUID nodeId; - - /** Future version. */ - private GridCacheVersion futVer; - - /** Write version. */ - private GridCacheVersion writeVer; - - /** Topology version. */ - private AffinityTopologyVersion topVer; - - /** Keys to update. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private List keys; - - /** Values to update. */ - @GridToStringInclude - @GridDirectCollection(CacheObject.class) - private List vals; - - /** Previous values. */ - @GridToStringInclude - @GridDirectCollection(CacheObject.class) - private List prevVals; - - /** Conflict versions. */ - @GridDirectCollection(GridCacheVersion.class) - private List conflictVers; - - /** TTLs. */ - private GridLongList ttls; - - /** Conflict expire time. */ - private GridLongList conflictExpireTimes; - - /** Near TTLs. */ - private GridLongList nearTtls; - - /** Near expire times. */ - private GridLongList nearExpireTimes; - - /** Write synchronization mode. */ - private CacheWriteSynchronizationMode syncMode; - - /** Near cache keys to update. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private List nearKeys; - - /** Values to update. */ - @GridToStringInclude - @GridDirectCollection(CacheObject.class) - private List nearVals; - - /** Force transform backups flag. */ - private boolean forceTransformBackups; - - /** Entry processors. */ - @GridDirectTransient - private List> entryProcessors; - - /** Entry processors bytes. */ - @GridDirectCollection(byte[].class) - private List entryProcessorsBytes; - - /** Near entry processors. */ - @GridDirectTransient - private List> nearEntryProcessors; - - /** Near entry processors bytes. */ - @GridDirectCollection(byte[].class) - private List nearEntryProcessorsBytes; - - /** Optional arguments for entry processor. */ - @GridDirectTransient - private Object[] invokeArgs; - - /** Entry processor arguments bytes. */ - private byte[][] invokeArgsBytes; - - /** Subject ID. */ - private UUID subjId; - - /** Task name hash. */ - private int taskNameHash; - - /** Partition. */ - private GridLongList updateCntrs; - - /** On response flag. Access should be synced on future. */ - @GridDirectTransient - private boolean onRes; - - /** */ - @GridDirectTransient - private List partIds; - - /** */ - @GridDirectTransient - private List locPrevVals; - - /** Keep binary flag. */ - private boolean keepBinary; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridDhtAtomicUpdateRequest() { - // No-op. - } - - /** - * Constructor. - * - * @param cacheId Cache ID. - * @param nodeId Node ID. - * @param futVer Future version. - * @param writeVer Write version for cache values. - * @param invokeArgs Optional arguments for entry processor. - * @param syncMode Cache write synchronization mode. - * @param topVer Topology version. - * @param forceTransformBackups Force transform backups flag. - * @param subjId Subject ID. - * @param taskNameHash Task name hash code. - * @param addDepInfo Deployment info. - */ - public GridDhtAtomicUpdateRequest( - int cacheId, - UUID nodeId, - GridCacheVersion futVer, - GridCacheVersion writeVer, - CacheWriteSynchronizationMode syncMode, - @NotNull AffinityTopologyVersion topVer, - boolean forceTransformBackups, - UUID subjId, - int taskNameHash, - Object[] invokeArgs, - boolean addDepInfo, - boolean keepBinary - ) { - assert invokeArgs == null || forceTransformBackups; - - this.cacheId = cacheId; - this.nodeId = nodeId; - this.futVer = futVer; - this.writeVer = writeVer; - this.syncMode = syncMode; - this.topVer = topVer; - this.forceTransformBackups = forceTransformBackups; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.invokeArgs = invokeArgs; - this.addDepInfo = addDepInfo; - this.keepBinary = keepBinary; - - keys = new ArrayList<>(); - partIds = new ArrayList<>(); - locPrevVals = new ArrayList<>(); - - if (forceTransformBackups) { - entryProcessors = new ArrayList<>(); - entryProcessorsBytes = new ArrayList<>(); - } - else - vals = new ArrayList<>(); - } +public interface GridDhtAtomicUpdateRequest { - /** - * @return Force transform backups flag. - */ - public boolean forceTransformBackups() { - return forceTransformBackups; - } + boolean forceTransformBackups(); - /** - * @param key Key to add. - * @param val Value, {@code null} if should be removed. - * @param entryProcessor Entry processor. - * @param ttl TTL (optional). - * @param conflictExpireTime Conflict expire time (optional). - * @param conflictVer Conflict version (optional). - * @param addPrevVal If {@code true} adds previous value. - * @param prevVal Previous value. - */ - public void addWriteValue(KeyCacheObject key, + void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, EntryProcessor entryProcessor, long ttl, @@ -251,815 +47,85 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid boolean addPrevVal, int partId, @Nullable CacheObject prevVal, - @Nullable Long updateIdx) { - keys.add(key); - - partIds.add(partId); - - locPrevVals.add(prevVal); - - if (forceTransformBackups) { - assert entryProcessor != null; - - entryProcessors.add(entryProcessor); - } - else - vals.add(val); - - if (addPrevVal) { - if (prevVals == null) - prevVals = new ArrayList<>(); - - prevVals.add(prevVal); - } - - if (updateIdx != null) { - if (updateCntrs == null) - updateCntrs = new GridLongList(); - - updateCntrs.add(updateIdx); - } - - // In case there is no conflict, do not create the list. - if (conflictVer != null) { - if (conflictVers == null) { - conflictVers = new ArrayList<>(); - - for (int i = 0; i < keys.size() - 1; i++) - conflictVers.add(null); - } - - conflictVers.add(conflictVer); - } - else if (conflictVers != null) - conflictVers.add(null); - - if (ttl >= 0) { - if (ttls == null) { - ttls = new GridLongList(keys.size()); - - for (int i = 0; i < keys.size() - 1; i++) - ttls.add(CU.TTL_NOT_CHANGED); - } - } + @Nullable Long updateIdx); - if (ttls != null) - ttls.add(ttl); - - if (conflictExpireTime >= 0) { - if (conflictExpireTimes == null) { - conflictExpireTimes = new GridLongList(keys.size()); - - for (int i = 0; i < keys.size() - 1; i++) - conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE); - } - } - - if (conflictExpireTimes != null) - conflictExpireTimes.add(conflictExpireTime); - } - - /** - * @param key Key to add. - * @param val Value, {@code null} if should be removed. - * @param entryProcessor Entry processor. - * @param ttl TTL. - * @param expireTime Expire time. - */ - public void addNearWriteValue(KeyCacheObject key, + void addNearWriteValue(KeyCacheObject key, @Nullable CacheObject val, EntryProcessor entryProcessor, long ttl, - long expireTime) { - if (nearKeys == null) { - nearKeys = new ArrayList<>(); - - if (forceTransformBackups) { - nearEntryProcessors = new ArrayList<>(); - nearEntryProcessorsBytes = new ArrayList<>(); - } - else - nearVals = new ArrayList<>(); - } - - nearKeys.add(key); - - if (forceTransformBackups) { - assert entryProcessor != null; - - nearEntryProcessors.add(entryProcessor); - } - else - nearVals.add(val); - - if (ttl >= 0) { - if (nearTtls == null) { - nearTtls = new GridLongList(nearKeys.size()); - - for (int i = 0; i < nearKeys.size() - 1; i++) - nearTtls.add(CU.TTL_NOT_CHANGED); - } - } - - if (nearTtls != null) - nearTtls.add(ttl); - - if (expireTime >= 0) { - if (nearExpireTimes == null) { - nearExpireTimes = new GridLongList(nearKeys.size()); - - for (int i = 0; i < nearKeys.size() - 1; i++) - nearExpireTimes.add(CU.EXPIRE_TIME_CALCULATE); - } - } - - if (nearExpireTimes != null) - nearExpireTimes.add(expireTime); - } - - /** {@inheritDoc} */ - @Override public int lookupIndex() { - return CACHE_MSG_IDX; - } - - /** - * @return Node ID. - */ - public UUID nodeId() { - return nodeId; - } - - /** - * @return Subject ID. - */ - public UUID subjectId() { - return subjId; - } - - /** - * @return Task name. - */ - public int taskNameHash() { - return taskNameHash; - } - - /** - * @return Keys size. - */ - public int size() { - return keys.size(); - } - - /** - * @return Keys size. - */ - public int nearSize() { - return nearKeys != null ? nearKeys.size() : 0; - } - - /** - * @return Version assigned on primary node. - */ - public GridCacheVersion futureVersion() { - return futVer; - } - - /** - * @return Write version. - */ - public GridCacheVersion writeVersion() { - return writeVer; - } - - /** - * @return Cache write synchronization mode. - */ - public CacheWriteSynchronizationMode writeSynchronizationMode() { - return syncMode; - } - - /** - * @return Topology version. - */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** - * @return Keys. - */ - public Collection keys() { - return keys; - } - - /** - * @param idx Key index. - * @return Key. - */ - public KeyCacheObject key(int idx) { - return keys.get(idx); - } - - /** - * @param idx Partition index. - * @return Partition id. - */ - public int partitionId(int idx) { - return partIds.get(idx); - } - - /** - * @param updCntr Update counter. - * @return Update counter. - */ - public Long updateCounter(int updCntr) { - if (updateCntrs != null && updCntr < updateCntrs.size()) - return updateCntrs.get(updCntr); - - return null; - } - - /** - * @param idx Near key index. - * @return Key. - */ - public KeyCacheObject nearKey(int idx) { - return nearKeys.get(idx); - } - - /** - * @return Keep binary flag. - */ - public boolean keepBinary() { - return keepBinary; - } - - /** - * @param idx Key index. - * @return Value. - */ - @Nullable public CacheObject value(int idx) { - if (vals != null) - return vals.get(idx); - - return null; - } - - /** - * @param idx Key index. - * @return Value. - */ - @Nullable public CacheObject previousValue(int idx) { - if (prevVals != null) - return prevVals.get(idx); - - return null; - } - - /** - * @param idx Key index. - * @return Value. - */ - @Nullable public CacheObject localPreviousValue(int idx) { - return locPrevVals.get(idx); - } - - /** - * @param idx Key index. - * @return Entry processor. - */ - @Nullable public EntryProcessor entryProcessor(int idx) { - return entryProcessors == null ? null : entryProcessors.get(idx); - } - - /** - * @param idx Near key index. - * @return Value. - */ - @Nullable public CacheObject nearValue(int idx) { - if (nearVals != null) - return nearVals.get(idx); - - return null; - } - - /** - * @param idx Key index. - * @return Transform closure. - */ - @Nullable public EntryProcessor nearEntryProcessor(int idx) { - return nearEntryProcessors == null ? null : nearEntryProcessors.get(idx); - } - - /** - * @param idx Index. - * @return Conflict version. - */ - @Nullable public GridCacheVersion conflictVersion(int idx) { - if (conflictVers != null) { - assert idx >= 0 && idx < conflictVers.size(); - - return conflictVers.get(idx); - } - - return null; - } - - /** - * @param idx Index. - * @return TTL. - */ - public long ttl(int idx) { - if (ttls != null) { - assert idx >= 0 && idx < ttls.size(); - - return ttls.get(idx); - } - - return CU.TTL_NOT_CHANGED; - } - - /** - * @param idx Index. - * @return TTL for near cache update. - */ - public long nearTtl(int idx) { - if (nearTtls != null) { - assert idx >= 0 && idx < nearTtls.size(); - - return nearTtls.get(idx); - } - - return CU.TTL_NOT_CHANGED; - } - - /** - * @param idx Index. - * @return Conflict expire time. - */ - public long conflictExpireTime(int idx) { - if (conflictExpireTimes != null) { - assert idx >= 0 && idx < conflictExpireTimes.size(); - - return conflictExpireTimes.get(idx); - } - - return CU.EXPIRE_TIME_CALCULATE; - } - - /** - * @param idx Index. - * @return Expire time for near cache update. - */ - public long nearExpireTime(int idx) { - if (nearExpireTimes != null) { - assert idx >= 0 && idx < nearExpireTimes.size(); - - return nearExpireTimes.get(idx); - } - - return CU.EXPIRE_TIME_CALCULATE; - } - - /** - * @return {@code True} if on response flag changed. - */ - public boolean onResponse() { - return !onRes && (onRes = true); - } - - /** - * @return Optional arguments for entry processor. - */ - @Nullable public Object[] invokeArguments() { - return invokeArgs; - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - prepareMarshalCacheObjects(keys, cctx); - - prepareMarshalCacheObjects(vals, cctx); - - prepareMarshalCacheObjects(nearKeys, cctx); - - prepareMarshalCacheObjects(nearVals, cctx); - - prepareMarshalCacheObjects(prevVals, cctx); - - if (forceTransformBackups) { - // force addition of deployment info for entry processors if P2P is enabled globally. - if (!addDepInfo && ctx.deploymentEnabled()) - addDepInfo = true; - - if (invokeArgsBytes == null) - invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); - - if (entryProcessorsBytes == null) - entryProcessorsBytes = marshalCollection(entryProcessors, cctx); - - if (nearEntryProcessorsBytes == null) - nearEntryProcessorsBytes = marshalCollection(nearEntryProcessors, cctx); - } - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalCacheObjects(keys, cctx, ldr); - - finishUnmarshalCacheObjects(vals, cctx, ldr); - - finishUnmarshalCacheObjects(nearKeys, cctx, ldr); - - finishUnmarshalCacheObjects(nearVals, cctx, ldr); - - finishUnmarshalCacheObjects(prevVals, cctx, ldr); - - if (forceTransformBackups) { - if (entryProcessors == null) - entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); - - if (invokeArgs == null) - invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); - - if (nearEntryProcessors == null) - nearEntryProcessors = unmarshalCollection(nearEntryProcessorsBytes, ctx, ldr); - } - } - - /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return addDepInfo; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 3: - if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeBoolean("forceTransformBackups", forceTransformBackups)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeMessage("futVer", futVer)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeBoolean("keepBinary", keepBinary)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 11: - if (!writer.writeCollection("nearEntryProcessorsBytes", nearEntryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) - return false; - - writer.incrementState(); - - case 12: - if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) - return false; - - writer.incrementState(); - - case 13: - if (!writer.writeCollection("nearKeys", nearKeys, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 14: - if (!writer.writeMessage("nearTtls", nearTtls)) - return false; - - writer.incrementState(); - - case 15: - if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 16: - if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 17: - if (!writer.writeUuid("subjId", subjId)) - return false; - - writer.incrementState(); - - case 18: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 19: - if (!writer.writeInt("taskNameHash", taskNameHash)) - return false; - - writer.incrementState(); - - case 20: - if (!writer.writeMessage("topVer", topVer)) - return false; - - writer.incrementState(); - - case 21: - if (!writer.writeMessage("ttls", ttls)) - return false; - - writer.incrementState(); - - case 22: - if (!writer.writeMessage("updateCntrs", updateCntrs)) - return false; - - writer.incrementState(); - - case 23: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 24: - if (!writer.writeMessage("writeVer", writeVer)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 3: - conflictExpireTimes = reader.readMessage("conflictExpireTimes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - forceTransformBackups = reader.readBoolean("forceTransformBackups"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - futVer = reader.readMessage("futVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 8: - invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - keepBinary = reader.readBoolean("keepBinary"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: - nearEntryProcessorsBytes = reader.readCollection("nearEntryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: - nearExpireTimes = reader.readMessage("nearExpireTimes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 13: - nearKeys = reader.readCollection("nearKeys", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 14: - nearTtls = reader.readMessage("nearTtls"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); + long expireTime); - case 15: - nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); + int lookupIndex(); - if (!reader.isLastRead()) - return false; + UUID nodeId(); - reader.incrementState(); + UUID subjectId(); - case 16: - prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); + int taskNameHash(); - if (!reader.isLastRead()) - return false; + int size(); - reader.incrementState(); + int nearSize(); - case 17: - subjId = reader.readUuid("subjId"); + GridCacheVersion futureVersion(); - if (!reader.isLastRead()) - return false; + GridCacheVersion writeVersion(); - reader.incrementState(); + CacheWriteSynchronizationMode writeSynchronizationMode(); - case 18: - byte syncModeOrd; + AffinityTopologyVersion topologyVersion(); - syncModeOrd = reader.readByte("syncMode"); + Collection keys(); - if (!reader.isLastRead()) - return false; + KeyCacheObject key(int idx); - syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); + int partitionId(int idx); - reader.incrementState(); + Long updateCounter(int updCntr); - case 19: - taskNameHash = reader.readInt("taskNameHash"); + KeyCacheObject nearKey(int idx); - if (!reader.isLastRead()) - return false; + boolean keepBinary(); - reader.incrementState(); + @Nullable CacheObject value(int idx); - case 20: - topVer = reader.readMessage("topVer"); + @Nullable CacheObject previousValue(int idx); - if (!reader.isLastRead()) - return false; + @Nullable CacheObject localPreviousValue(int idx); - reader.incrementState(); + @Nullable EntryProcessor entryProcessor(int idx); - case 21: - ttls = reader.readMessage("ttls"); + @Nullable CacheObject nearValue(int idx); - if (!reader.isLastRead()) - return false; + @Nullable EntryProcessor nearEntryProcessor(int idx); - reader.incrementState(); + @Nullable GridCacheVersion conflictVersion(int idx); - case 22: - updateCntrs = reader.readMessage("updateCntrs"); + long ttl(int idx); - if (!reader.isLastRead()) - return false; + long nearTtl(int idx); - reader.incrementState(); + long conflictExpireTime(int idx); - case 23: - vals = reader.readCollection("vals", MessageCollectionItemType.MSG); + long nearExpireTime(int idx); - if (!reader.isLastRead()) - return false; + boolean onResponse(); - reader.incrementState(); + @Nullable Object[] invokeArguments(); - case 24: - writeVer = reader.readMessage("writeVer"); + void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException; - if (!reader.isLastRead()) - return false; + void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException; - reader.incrementState(); + boolean addDeploymentInfo(); - } + boolean writeTo(ByteBuffer buf, MessageWriter writer); - return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class); - } + boolean readFrom(ByteBuffer buf, MessageReader reader); - /** {@inheritDoc} */ - @Override public byte directType() { - return 38; - } + byte directType(); - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 25; - } + byte fieldsCount(); - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtAtomicUpdateRequest.class, this, "super", super.toString()); - } + IgniteCheckedException classError(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java index 8f1d9a2..a74fed6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateResponse.java @@ -1,297 +1,63 @@ /* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * * Licensed to the Apache Software Foundation (ASF) under one or more + * * contributor license agreements. See the NOTICE file distributed with + * * this work for additional information regarding copyright ownership. + * * The ASF licenses this file to You under the Apache License, Version 2.0 + * * (the "License"); you may not use this file except in compliance with + * * the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. */ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; -import java.io.Externalizable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import java.nio.ByteBuffer; +import java.util.Collection; -/** - * DHT atomic cache backup update response. - */ -public class GridDhtAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable { - /** */ - private static final long serialVersionUID = 0L; - - /** Message index. */ - public static final int CACHE_MSG_IDX = nextIndexId(); - - /** Future version. */ - private GridCacheVersion futVer; - - /** Failed keys. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private List failedKeys; - - /** Update error. */ - @GridDirectTransient - private IgniteCheckedException err; - - /** Serialized update error. */ - private byte[] errBytes; - - /** Evicted readers. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private List nearEvicted; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridDhtAtomicUpdateResponse() { - // No-op. - } - - /** - * @param cacheId Cache ID. - * @param futVer Future version. - * @param addDepInfo Deployment info. - */ - public GridDhtAtomicUpdateResponse(int cacheId, GridCacheVersion futVer, boolean addDepInfo) { - this.cacheId = cacheId; - this.futVer = futVer; - this.addDepInfo = addDepInfo; - } - - /** {@inheritDoc} */ - @Override public int lookupIndex() { - return CACHE_MSG_IDX; - } - - /** - * @return Future version. - */ - public GridCacheVersion futureVersion() { - return futVer; - } - - /** - * Sets update error. - * - * @param err Error. - */ - public void onError(IgniteCheckedException err){ - this.err = err; - } - - /** {@inheritDoc} */ - @Override public IgniteCheckedException error() { - return err; - } - - /** - * @return Failed keys. - */ - public Collection failedKeys() { - return failedKeys; - } - - /** - * Adds key to collection of failed keys. - * - * @param key Key to add. - * @param e Error cause. - */ - public void addFailedKey(KeyCacheObject key, Throwable e) { - if (failedKeys == null) - failedKeys = new ArrayList<>(); - - failedKeys.add(key); - - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); - - err.addSuppressed(e); - } - - /** - * @return Evicted readers. - */ - public Collection nearEvicted() { - return nearEvicted; - } - - /** - * Adds near evicted key.. - * - * @param key Evicted key. - */ - public void addNearEvicted(KeyCacheObject key) { - if (nearEvicted == null) - nearEvicted = new ArrayList<>(); - - nearEvicted.add(key); - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - prepareMarshalCacheObjects(failedKeys, cctx); - - prepareMarshalCacheObjects(nearEvicted, cctx); - - if (errBytes == null) - errBytes = ctx.marshaller().marshal(err); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalCacheObjects(failedKeys, cctx, ldr); - - finishUnmarshalCacheObjects(nearEvicted, cctx, ldr); - - if (errBytes != null && err == null) - err = ctx.marshaller().unmarshal(errBytes, ldr); - } - - /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return addDepInfo; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 3: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeMessage("futVer", futVer)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeCollection("nearEvicted", nearEvicted, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 3: - errBytes = reader.readByteArray("errBytes"); - - if (!reader.isLastRead()) - return false; +public interface GridDhtAtomicUpdateResponse { + int lookupIndex(); - reader.incrementState(); + GridCacheVersion futureVersion(); - case 4: - failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG); + void onError(IgniteCheckedException err); - if (!reader.isLastRead()) - return false; + IgniteCheckedException error(); - reader.incrementState(); + Collection failedKeys(); - case 5: - futVer = reader.readMessage("futVer"); + void addFailedKey(KeyCacheObject key, Throwable e); - if (!reader.isLastRead()) - return false; + Collection nearEvicted(); - reader.incrementState(); + void addNearEvicted(KeyCacheObject key); - case 6: - nearEvicted = reader.readCollection("nearEvicted", MessageCollectionItemType.MSG); + void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException; - if (!reader.isLastRead()) - return false; + void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException; - reader.incrementState(); + boolean addDeploymentInfo(); - } + boolean writeTo(ByteBuffer buf, MessageWriter writer); - return reader.afterMessageRead(GridDhtAtomicUpdateResponse.class); - } + boolean readFrom(ByteBuffer buf, MessageReader reader); - /** {@inheritDoc} */ - @Override public byte directType() { - return 39; - } + byte directType(); - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 7; - } + byte fieldsCount(); - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridDhtAtomicUpdateResponse.class, this); - } + long messageId(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java index b3f7e74..d6eabd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateResponse.java @@ -593,7 +593,7 @@ public class GridNearAtomicSingleUpdateResponse extends GridCacheMessage impleme } - return reader.afterMessageRead(GridNearAtomicMultipleUpdateResponse.class); + return reader.afterMessageRead(GridNearAtomicSingleUpdateResponse.class); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 5aef8e7..168076a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -43,9 +43,9 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicMultipleUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java index 50a6114..0633a1e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java @@ -27,7 +27,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicMultipleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest; import org.apache.ignite.lang.IgniteInClosure; @@ -140,7 +140,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest commSpi.registerMessage(GridNearAtomicMultipleUpdateRequest.class); commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class); - commSpi.registerMessage(GridDhtAtomicUpdateRequest.class); + commSpi.registerMessage(GridDhtAtomicMultipleUpdateRequest.class); int putCnt = 15; @@ -210,7 +210,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest * @return Count. */ private int dhtRequestsCount(TestCommunicationSpi commSpi) { - return commSpi.messageCount(GridDhtAtomicUpdateRequest.class); + return commSpi.messageCount(GridDhtAtomicMultipleUpdateRequest.class); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/3391c847/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java index 0e7755b..e3adc21 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java @@ -480,7 +480,7 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA return delay && ( (origMsg instanceof GridNearAtomicMultipleUpdateRequest) || (origMsg instanceof GridNearAtomicSingleUpdateRequest) || - (origMsg instanceof GridDhtAtomicUpdateRequest) + (origMsg instanceof GridDhtAtomicMultipleUpdateRequest) ); } }