Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 31C6F189A8 for ; Thu, 25 Feb 2016 12:31:15 +0000 (UTC) Received: (qmail 535 invoked by uid 500); 25 Feb 2016 12:31:02 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 449 invoked by uid 500); 25 Feb 2016 12:31:02 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 98558 invoked by uid 99); 25 Feb 2016 12:31:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Feb 2016 12:31:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 52B6FE8EF9; Thu, 25 Feb 2016 12:31:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vozerov@apache.org To: commits@ignite.apache.org Date: Thu, 25 Feb 2016 12:31:45 -0000 Message-Id: <9665d665d7fb45b6baf17f0b1fd47582@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [46/51] [abbrv] ignite git commit: Review. http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java deleted file mode 100644 index 60193ba..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateRequest.java +++ /dev/null @@ -1,989 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; - -import java.io.Externalizable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.UUID; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.processor.EntryProcessor; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; -import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridLongList; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; -import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; -import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; - -/** - * Lite DHT cache update request sent from near node to primary node. - */ -public class GridNearAtomicUpdateRequest extends GridCacheMessage - implements GridNearAtomicUpdateRequestInterface, GridCacheDeployable { - /** */ - private static final long serialVersionUID = 0L; - - /** Message index. */ - public static final int CACHE_MSG_IDX = nextIndexId(); - - /** Target node ID. */ - @GridDirectTransient - private UUID nodeId; - - /** Future version. */ - private GridCacheVersion futVer; - - /** Fast map flag. */ - private boolean fastMap; - - /** Update version. Set to non-null if fastMap is {@code true}. */ - private GridCacheVersion updateVer; - - /** Topology version. */ - private AffinityTopologyVersion topVer; - - /** Topology locked flag. Set if atomic update is performed inside TX or explicit lock. */ - private boolean topLocked; - - /** Write synchronization mode. */ - private CacheWriteSynchronizationMode syncMode; - - /** Update operation. */ - private GridCacheOperation op; - - /** Keys to update. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private List keys; - - /** Values to update. */ - @GridDirectCollection(CacheObject.class) - private List vals; - - /** Entry processors. */ - @GridDirectTransient - private List> entryProcessors; - - /** Entry processors bytes. */ - @GridDirectCollection(byte[].class) - private List entryProcessorsBytes; - - /** Optional arguments for entry processor. */ - @GridDirectTransient - private Object[] invokeArgs; - - /** Entry processor arguments bytes. */ - private byte[][] invokeArgsBytes; - - /** Conflict versions. */ - @GridDirectCollection(GridCacheVersion.class) - private List conflictVers; - - /** Conflict TTLs. */ - private GridLongList conflictTtls; - - /** Conflict expire times. */ - private GridLongList conflictExpireTimes; - - /** Return value flag. */ - private boolean retval; - - /** Expiry policy. */ - @GridDirectTransient - private ExpiryPolicy expiryPlc; - - /** Expiry policy bytes. */ - private byte[] expiryPlcBytes; - - /** Filter. */ - private CacheEntryPredicate[] filter; - - /** Flag indicating whether request contains primary keys. */ - private boolean hasPrimary; - - /** Subject ID. */ - private UUID subjId; - - /** Task name hash. */ - private int taskNameHash; - - /** Skip write-through to a persistent storage. */ - private boolean skipStore; - - /** */ - private boolean clientReq; - - /** Keep binary flag. */ - private boolean keepBinary; - - /** */ - @GridDirectTransient - private GridNearAtomicUpdateResponseInterface res; - - /** Maximum possible size of inner collections. */ - @GridDirectTransient - private int initSize; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridNearAtomicUpdateRequest() { - // No-op. - } - - /** - * Constructor. - * - * @param cacheId Cache ID. - * @param nodeId Node ID. - * @param futVer Future version. - * @param fastMap Fast map scheme flag. - * @param updateVer Update version set if fast map is performed. - * @param topVer Topology version. - * @param topLocked Topology locked flag. - * @param syncMode Synchronization mode. - * @param op Cache update operation. - * @param retval Return value required flag. - * @param expiryPlc Expiry policy. - * @param invokeArgs Optional arguments for entry processor. - * @param filter Optional filter for atomic check. - * @param subjId Subject ID. - * @param taskNameHash Task name hash code. - * @param skipStore Skip write-through to a persistent storage. - * @param keepBinary Keep binary flag. - * @param clientReq Client node request flag. - * @param addDepInfo Deployment info flag. - * @param maxEntryCnt Maximum entries count. - */ - public GridNearAtomicUpdateRequest( - int cacheId, - UUID nodeId, - GridCacheVersion futVer, - boolean fastMap, - @Nullable GridCacheVersion updateVer, - @NotNull AffinityTopologyVersion topVer, - boolean topLocked, - CacheWriteSynchronizationMode syncMode, - GridCacheOperation op, - boolean retval, - @Nullable ExpiryPolicy expiryPlc, - @Nullable Object[] invokeArgs, - @Nullable CacheEntryPredicate[] filter, - @Nullable UUID subjId, - int taskNameHash, - boolean skipStore, - boolean keepBinary, - boolean clientReq, - boolean addDepInfo, - int maxEntryCnt - ) { - assert futVer != null; - - this.cacheId = cacheId; - this.nodeId = nodeId; - this.futVer = futVer; - this.fastMap = fastMap; - this.updateVer = updateVer; - - this.topVer = topVer; - this.topLocked = topLocked; - this.syncMode = syncMode; - this.op = op; - this.retval = retval; - this.expiryPlc = expiryPlc; - this.invokeArgs = invokeArgs; - this.filter = filter; - this.subjId = subjId; - this.taskNameHash = taskNameHash; - this.skipStore = skipStore; - this.keepBinary = keepBinary; - this.clientReq = clientReq; - this.addDepInfo = addDepInfo; - - // By default ArrayList expands to array of 10 elements on first add. We cannot guess how many entries - // will be added to request because of unknown affinity distribution. However, we DO KNOW how many keys - // participate in request. As such, we know upper bound of all collections in request. If this bound is lower - // than 10, we use it. - initSize = Math.min(maxEntryCnt, 10); - - keys = new ArrayList<>(initSize); - } - - /** {@inheritDoc} */ - @Override public int lookupIndex() { - return CACHE_MSG_IDX; - } - - /** {@inheritDoc} */ - @Override public UUID nodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public void nodeId(UUID nodeId) { - this.nodeId = nodeId; - } - - /** {@inheritDoc} */ - @Override public UUID subjectId() { - return subjId; - } - - /** {@inheritDoc} */ - @Override public int taskNameHash() { - return taskNameHash; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion futureVersion() { - return futVer; - } - - /** {@inheritDoc} */ - @Override public boolean fastMap() { - return fastMap; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion updateVersion() { - return updateVer; - } - - /** {@inheritDoc} */ - @Override public AffinityTopologyVersion topologyVersion() { - return topVer; - } - - /** {@inheritDoc} */ - @Override public boolean topologyLocked() { - return topLocked; - } - - /** {@inheritDoc} */ - @Override public boolean clientRequest() { - return clientReq; - } - - /** {@inheritDoc} */ - @Override public CacheWriteSynchronizationMode writeSynchronizationMode() { - return syncMode; - } - - /** {@inheritDoc} */ - @Override public ExpiryPolicy expiry() { - return expiryPlc; - } - - /** {@inheritDoc} */ - @Override public boolean returnValue() { - return retval; - } - - /** {@inheritDoc} */ - @Nullable public CacheEntryPredicate[] filter() { - return filter; - } - - /** {@inheritDoc} */ - @Override public boolean skipStore() { - return skipStore; - } - - /** {@inheritDoc} */ - @Override public boolean keepBinary() { - return keepBinary; - } - - /** - * @param key Key to add. - * @param val Optional update value. - * @param conflictTtl Conflict TTL (optional). - * @param conflictExpireTime Conflict expire time (optional). - * @param conflictVer Conflict version (optional). - * @param primary If given key is primary on this mapping. - */ - @SuppressWarnings("unchecked") - public void addUpdateEntry(KeyCacheObject key, - @Nullable Object val, - long conflictTtl, - long conflictExpireTime, - @Nullable GridCacheVersion conflictVer, - boolean primary) { - EntryProcessor entryProcessor = null; - - if (op == TRANSFORM) { - assert val instanceof EntryProcessor : val; - - entryProcessor = (EntryProcessor)val; - } - - assert val != null || op == DELETE; - - keys.add(key); - - if (entryProcessor != null) { - if (entryProcessors == null) - entryProcessors = new ArrayList<>(initSize); - - entryProcessors.add(entryProcessor); - } - else if (val != null) { - assert val instanceof CacheObject : val; - - if (vals == null) - vals = new ArrayList<>(initSize); - - vals.add((CacheObject)val); - } - - hasPrimary |= primary; - - // In case there is no conflict, do not create the list. - if (conflictVer != null) { - if (conflictVers == null) { - conflictVers = new ArrayList<>(initSize); - - for (int i = 0; i < keys.size() - 1; i++) - conflictVers.add(null); - } - - conflictVers.add(conflictVer); - } - else if (conflictVers != null) - conflictVers.add(null); - - if (conflictTtl >= 0) { - if (conflictTtls == null) { - conflictTtls = new GridLongList(keys.size()); - - for (int i = 0; i < keys.size() - 1; i++) - conflictTtls.add(CU.TTL_NOT_CHANGED); - } - - conflictTtls.add(conflictTtl); - } - - if (conflictExpireTime >= 0) { - if (conflictExpireTimes == null) { - conflictExpireTimes = new GridLongList(keys.size()); - - for (int i = 0; i < keys.size() - 1; i++) - conflictExpireTimes.add(CU.EXPIRE_TIME_CALCULATE); - } - - conflictExpireTimes.add(conflictExpireTime); - } - } - - /** {@inheritDoc} */ - @Override public List keys() { - return keys; - } - - /** {@inheritDoc} */ - @Override public List values() { - return op == TRANSFORM ? entryProcessors : vals; - } - - /** {@inheritDoc} */ - @Override public GridCacheOperation operation() { - return op; - } - - /** {@inheritDoc} */ - @Override @Nullable public Object[] invokeArguments() { - return invokeArgs; - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public CacheObject value(int idx) { - assert op == UPDATE : op; - - return vals.get(idx); - } - - /** {@inheritDoc} */ - @SuppressWarnings("unchecked") - @Override public EntryProcessor entryProcessor(int idx) { - assert op == TRANSFORM : op; - - return entryProcessors.get(idx); - } - - /** {@inheritDoc} */ - @Override public CacheObject writeValue(int idx) { - if (vals != null) - return vals.get(idx); - - return null; - } - - /** {@inheritDoc} */ - @Override @Nullable public List conflictVersions() { - return conflictVers; - } - - /** {@inheritDoc} */ - @Override @Nullable public GridCacheVersion conflictVersion(int idx) { - if (conflictVers != null) { - assert idx >= 0 && idx < conflictVers.size(); - - return conflictVers.get(idx); - } - - return null; - } - - /** {@inheritDoc} */ - @Override public long conflictTtl(int idx) { - if (conflictTtls != null) { - assert idx >= 0 && idx < conflictTtls.size(); - - return conflictTtls.get(idx); - } - - return CU.TTL_NOT_CHANGED; - } - - /** {@inheritDoc} */ - @Override public long conflictExpireTime(int idx) { - if (conflictExpireTimes != null) { - assert idx >= 0 && idx < conflictExpireTimes.size(); - - return conflictExpireTimes.get(idx); - } - - return CU.EXPIRE_TIME_CALCULATE; - } - - /** {@inheritDoc} */ - @Override public boolean hasPrimary() { - return hasPrimary; - } - - /** {@inheritDoc} */ - @Override public boolean onResponse(GridNearAtomicUpdateResponseInterface res) { - if (this.res == null) { - this.res = res; - - return true; - } - - return false; - } - - /** {@inheritDoc} */ - @Override @Nullable public GridNearAtomicUpdateResponseInterface response() { - return res; - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - prepareMarshalCacheObjects(keys, cctx); - - if (filter != null) { - boolean hasFilter = false; - - for (CacheEntryPredicate p : filter) { - if (p != null) { - hasFilter = true; - - p.prepareMarshal(cctx); - } - } - - if (!hasFilter) - filter = null; - } - - if (expiryPlc != null && expiryPlcBytes == null) - expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc)); - - if (op == TRANSFORM) { - // force addition of deployment info for entry processors if P2P is enabled globally. - if (!addDepInfo && ctx.deploymentEnabled()) - addDepInfo = true; - - if (entryProcessorsBytes == null) - entryProcessorsBytes = marshalCollection(entryProcessors, cctx); - - if (invokeArgsBytes == null) - invokeArgsBytes = marshalInvokeArguments(invokeArgs, cctx); - } - else - prepareMarshalCacheObjects(vals, cctx); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalCacheObjects(keys, cctx, ldr); - - if (op == TRANSFORM) { - if (entryProcessors == null) - entryProcessors = unmarshalCollection(entryProcessorsBytes, ctx, ldr); - - if (invokeArgs == null) - invokeArgs = unmarshalInvokeArguments(invokeArgsBytes, ctx, ldr); - } - else - finishUnmarshalCacheObjects(vals, cctx, ldr); - - if (filter != null) { - for (CacheEntryPredicate p : filter) { - if (p != null) - p.finishUnmarshal(cctx, ldr); - } - } - - if (expiryPlcBytes != null && expiryPlc == null) - expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - } - - /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return addDepInfo; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 3: - if (!writer.writeBoolean("clientReq", clientReq)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeMessage("conflictExpireTimes", conflictExpireTimes)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeMessage("conflictTtls", conflictTtls)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeCollection("conflictVers", conflictVers, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeCollection("entryProcessorsBytes", entryProcessorsBytes, MessageCollectionItemType.BYTE_ARR)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeByteArray("expiryPlcBytes", expiryPlcBytes)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeBoolean("fastMap", fastMap)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 11: - if (!writer.writeMessage("futVer", futVer)) - return false; - - writer.incrementState(); - - case 12: - if (!writer.writeBoolean("hasPrimary", hasPrimary)) - return false; - - writer.incrementState(); - - case 13: - if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR)) - return false; - - writer.incrementState(); - - case 14: - if (!writer.writeBoolean("keepBinary", keepBinary)) - return false; - - writer.incrementState(); - - case 15: - if (!writer.writeCollection("keys", keys, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 16: - if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 17: - if (!writer.writeBoolean("retval", retval)) - return false; - - writer.incrementState(); - - case 18: - if (!writer.writeBoolean("skipStore", skipStore)) - return false; - - writer.incrementState(); - - case 19: - if (!writer.writeUuid("subjId", subjId)) - return false; - - writer.incrementState(); - - case 20: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) - return false; - - writer.incrementState(); - - case 21: - if (!writer.writeInt("taskNameHash", taskNameHash)) - return false; - - writer.incrementState(); - - case 22: - if (!writer.writeBoolean("topLocked", topLocked)) - return false; - - writer.incrementState(); - - case 23: - if (!writer.writeMessage("topVer", topVer)) - return false; - - writer.incrementState(); - - case 24: - if (!writer.writeMessage("updateVer", updateVer)) - return false; - - writer.incrementState(); - - case 25: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 3: - clientReq = reader.readBoolean("clientReq"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - conflictExpireTimes = reader.readMessage("conflictExpireTimes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - conflictTtls = reader.readMessage("conflictTtls"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - conflictVers = reader.readCollection("conflictVers", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - entryProcessorsBytes = reader.readCollection("entryProcessorsBytes", MessageCollectionItemType.BYTE_ARR); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 8: - expiryPlcBytes = reader.readByteArray("expiryPlcBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - fastMap = reader.readBoolean("fastMap"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG, CacheEntryPredicate.class); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: - futVer = reader.readMessage("futVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: - hasPrimary = reader.readBoolean("hasPrimary"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 13: - invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR, byte[].class); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 14: - keepBinary = reader.readBoolean("keepBinary"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 15: - keys = reader.readCollection("keys", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 16: - byte opOrd; - - opOrd = reader.readByte("op"); - - if (!reader.isLastRead()) - return false; - - op = GridCacheOperation.fromOrdinal(opOrd); - - reader.incrementState(); - - case 17: - retval = reader.readBoolean("retval"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 18: - skipStore = reader.readBoolean("skipStore"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 19: - subjId = reader.readUuid("subjId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 20: - byte syncModeOrd; - - syncModeOrd = reader.readByte("syncMode"); - - if (!reader.isLastRead()) - return false; - - syncMode = CacheWriteSynchronizationMode.fromOrdinal(syncModeOrd); - - reader.incrementState(); - - case 21: - taskNameHash = reader.readInt("taskNameHash"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 22: - topLocked = reader.readBoolean("topLocked"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 23: - topVer = reader.readMessage("topVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 24: - updateVer = reader.readMessage("updateVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 25: - vals = reader.readCollection("vals", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(GridNearAtomicUpdateRequest.class); - } - - /** {@inheritDoc} */ - @Override public void cleanup(boolean clearKeys) { - vals = null; - entryProcessors = null; - entryProcessorsBytes = null; - invokeArgs = null; - invokeArgsBytes = null; - - if (clearKeys) - keys = null; - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 40; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 26; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridNearAtomicUpdateRequest.class, this, "filter", Arrays.toString(filter), - "parent", super.toString()); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java deleted file mode 100644 index 84a4a9f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateResponse.java +++ /dev/null @@ -1,575 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; - -import java.io.Externalizable; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.GridDirectTransient; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheDeployable; -import org.apache.ignite.internal.processors.cache.GridCacheMessage; -import org.apache.ignite.internal.processors.cache.GridCacheReturn; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridLongList; -import org.apache.ignite.internal.util.tostring.GridToStringInclude; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; -import org.jetbrains.annotations.Nullable; - -/** - * DHT atomic cache near update response. - */ -public class GridNearAtomicUpdateResponse extends GridCacheMessage implements GridCacheDeployable, GridNearAtomicUpdateResponseInterface { - /** */ - private static final long serialVersionUID = 0L; - - /** Cache message index. */ - public static final int CACHE_MSG_IDX = nextIndexId(); - - /** Node ID this reply should be sent to. */ - @GridDirectTransient - private UUID nodeId; - - /** Future version. */ - private GridCacheVersion futVer; - - /** Update error. */ - @GridDirectTransient - private volatile IgniteCheckedException err; - - /** Serialized error. */ - private byte[] errBytes; - - /** Return value. */ - @GridToStringInclude - private GridCacheReturn ret; - - /** Failed keys. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private volatile Collection failedKeys; - - /** Keys that should be remapped. */ - @GridToStringInclude - @GridDirectCollection(KeyCacheObject.class) - private List remapKeys; - - /** Indexes of keys for which values were generated on primary node (used if originating node has near cache). */ - @GridDirectCollection(int.class) - private List nearValsIdxs; - - /** Indexes of keys for which update was skipped (used if originating node has near cache). */ - @GridDirectCollection(int.class) - private List nearSkipIdxs; - - /** Values generated on primary node which should be put to originating node's near cache. */ - @GridToStringInclude - @GridDirectCollection(CacheObject.class) - private List nearVals; - - /** Version generated on primary node to be used for originating node's near cache update. */ - private GridCacheVersion nearVer; - - /** Near TTLs. */ - private GridLongList nearTtls; - - /** Near expire times. */ - private GridLongList nearExpireTimes; - - /** - * Empty constructor required by {@link Externalizable}. - */ - public GridNearAtomicUpdateResponse() { - // No-op. - } - - /** - * @param cacheId Cache ID. - * @param nodeId Node ID this reply should be sent to. - * @param futVer Future version. - * @param addDepInfo Deployment info flag. - */ - public GridNearAtomicUpdateResponse(int cacheId, UUID nodeId, GridCacheVersion futVer, boolean addDepInfo) { - assert futVer != null; - - this.cacheId = cacheId; - this.nodeId = nodeId; - this.futVer = futVer; - this.addDepInfo = addDepInfo; - } - - /** {@inheritDoc} */ - @Override public int lookupIndex() { - return CACHE_MSG_IDX; - } - - /** {@inheritDoc} */ - @Override public UUID nodeId() { - return nodeId; - } - - /** {@inheritDoc} */ - @Override public void nodeId(UUID nodeId) { - this.nodeId = nodeId; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion futureVersion() { - return futVer; - } - - /** {@inheritDoc} */ - @Override public void error(IgniteCheckedException err) { - this.err = err; - } - - /** {@inheritDoc} */ - @Override public IgniteCheckedException error() { - return err; - } - - /** {@inheritDoc} */ - @Override public Collection failedKeys() { - return failedKeys; - } - - /** {@inheritDoc} */ - @Override public GridCacheReturn returnValue() { - return ret; - } - - /** {@inheritDoc} */ - @Override @SuppressWarnings("unchecked") - public void returnValue(GridCacheReturn ret) { - this.ret = ret; - } - - /** {@inheritDoc} */ - @Override public void remapKeys(List remapKeys) { - this.remapKeys = remapKeys; - } - - /** {@inheritDoc} */ - @Override public Collection remapKeys() { - return remapKeys; - } - - /** {@inheritDoc} */ - @Override public void addNearValue(int keyIdx, - @Nullable CacheObject val, - long ttl, - long expireTime) { - if (nearValsIdxs == null) { - nearValsIdxs = new ArrayList<>(); - nearVals = new ArrayList<>(); - } - - addNearTtl(keyIdx, ttl, expireTime); - - nearValsIdxs.add(keyIdx); - nearVals.add(val); - } - - /** {@inheritDoc} */ - @Override @SuppressWarnings("ForLoopReplaceableByForEach") - public void addNearTtl(int keyIdx, long ttl, long expireTime) { - if (ttl >= 0) { - if (nearTtls == null) { - nearTtls = new GridLongList(16); - - for (int i = 0; i < keyIdx; i++) - nearTtls.add(-1L); - } - } - - if (nearTtls != null) - nearTtls.add(ttl); - - if (expireTime >= 0) { - if (nearExpireTimes == null) { - nearExpireTimes = new GridLongList(16); - - for (int i = 0; i < keyIdx; i++) - nearExpireTimes.add(-1); - } - } - - if (nearExpireTimes != null) - nearExpireTimes.add(expireTime); - } - - /** {@inheritDoc} */ - @Override public long nearExpireTime(int idx) { - if (nearExpireTimes != null) { - assert idx >= 0 && idx < nearExpireTimes.size(); - - return nearExpireTimes.get(idx); - } - - return -1L; - } - - /** {@inheritDoc} */ - @Override public long nearTtl(int idx) { - if (nearTtls != null) { - assert idx >= 0 && idx < nearTtls.size(); - - return nearTtls.get(idx); - } - - return -1L; - } - - /** {@inheritDoc} */ - @Override public void nearVersion(GridCacheVersion nearVer) { - this.nearVer = nearVer; - } - - /** {@inheritDoc} */ - @Override public GridCacheVersion nearVersion() { - return nearVer; - } - - /** {@inheritDoc} */ - @Override public void addSkippedIndex(int keyIdx) { - if (nearSkipIdxs == null) - nearSkipIdxs = new ArrayList<>(); - - nearSkipIdxs.add(keyIdx); - - addNearTtl(keyIdx, -1L, -1L); - } - - /** {@inheritDoc} */ - @Override @Nullable public List skippedIndexes() { - return nearSkipIdxs; - } - - /** {@inheritDoc} */ - @Override @Nullable public List nearValuesIndexes() { - return nearValsIdxs; - } - - /** {@inheritDoc} */ - @Override @Nullable public CacheObject nearValue(int idx) { - return nearVals.get(idx); - } - - /** {@inheritDoc} */ - @Override public synchronized void addFailedKey(KeyCacheObject key, Throwable e) { - if (failedKeys == null) - failedKeys = new ConcurrentLinkedQueue<>(); - - failedKeys.add(key); - - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); - - err.addSuppressed(e); - } - - /** {@inheritDoc} */ - @Override public synchronized void addFailedKeys(Collection keys, Throwable e) { - if (keys != null) { - if (failedKeys == null) - failedKeys = new ArrayList<>(keys.size()); - - failedKeys.addAll(keys); - } - - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); - - err.addSuppressed(e); - } - - /** {@inheritDoc} */ - @Override public synchronized void addFailedKeys(Collection keys, Throwable e, - GridCacheContext ctx) { - if (failedKeys == null) - failedKeys = new ArrayList<>(keys.size()); - - failedKeys.addAll(keys); - - if (err == null) - err = new IgniteCheckedException("Failed to update keys on primary node."); - - err.addSuppressed(e); - } - - /** {@inheritDoc} */ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (err != null && errBytes == null) - errBytes = ctx.marshaller().marshal(err); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - prepareMarshalCacheObjects(failedKeys, cctx); - - prepareMarshalCacheObjects(remapKeys, cctx); - - prepareMarshalCacheObjects(nearVals, cctx); - - if (ret != null) - ret.prepareMarshal(cctx); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (errBytes != null && err == null) - err = ctx.marshaller().unmarshal(errBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); - - GridCacheContext cctx = ctx.cacheContext(cacheId); - - finishUnmarshalCacheObjects(failedKeys, cctx, ldr); - - finishUnmarshalCacheObjects(remapKeys, cctx, ldr); - - finishUnmarshalCacheObjects(nearVals, cctx, ldr); - - if (ret != null) - ret.finishUnmarshal(cctx, ldr); - } - - /** {@inheritDoc} */ - @Override public boolean addDeploymentInfo() { - return addDepInfo; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!super.writeTo(buf, writer)) - return false; - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 3: - if (!writer.writeByteArray("errBytes", errBytes)) - return false; - - writer.incrementState(); - - case 4: - if (!writer.writeCollection("failedKeys", failedKeys, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 5: - if (!writer.writeMessage("futVer", futVer)) - return false; - - writer.incrementState(); - - case 6: - if (!writer.writeMessage("nearExpireTimes", nearExpireTimes)) - return false; - - writer.incrementState(); - - case 7: - if (!writer.writeCollection("nearSkipIdxs", nearSkipIdxs, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - - case 8: - if (!writer.writeMessage("nearTtls", nearTtls)) - return false; - - writer.incrementState(); - - case 9: - if (!writer.writeCollection("nearVals", nearVals, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 10: - if (!writer.writeCollection("nearValsIdxs", nearValsIdxs, MessageCollectionItemType.INT)) - return false; - - writer.incrementState(); - - case 11: - if (!writer.writeMessage("nearVer", nearVer)) - return false; - - writer.incrementState(); - - case 12: - if (!writer.writeCollection("remapKeys", remapKeys, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 13: - if (!writer.writeMessage("ret", ret)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - if (!super.readFrom(buf, reader)) - return false; - - switch (reader.state()) { - case 3: - errBytes = reader.readByteArray("errBytes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 4: - failedKeys = reader.readCollection("failedKeys", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 5: - futVer = reader.readMessage("futVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 6: - nearExpireTimes = reader.readMessage("nearExpireTimes"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 7: - nearSkipIdxs = reader.readCollection("nearSkipIdxs", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 8: - nearTtls = reader.readMessage("nearTtls"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 9: - nearVals = reader.readCollection("nearVals", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 10: - nearValsIdxs = reader.readCollection("nearValsIdxs", MessageCollectionItemType.INT); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 11: - nearVer = reader.readMessage("nearVer"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 12: - remapKeys = reader.readCollection("remapKeys", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 13: - ret = reader.readMessage("ret"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(GridNearAtomicUpdateResponse.class); - } - - /** {@inheritDoc} */ - @Override public byte directType() { - return 41; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 14; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(GridNearAtomicUpdateResponse.class, this, "parent"); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java index 5d5344e..55cd231 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCacheTest.java @@ -50,7 +50,7 @@ import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; @@ -816,7 +816,7 @@ public class IgniteClientReconnectCacheTest extends IgniteClientReconnectAbstrac if (ccfg.getAtomicityMode() == ATOMIC) checkOperationInProgressFails(client, ccfg, F.>asList( - GridNearAtomicUpdateResponse.class, GridNearAtomicSingleUpdateResponse.class), + GridNearAtomicMultipleUpdateResponse.class, GridNearAtomicSingleUpdateResponse.class), putOp); else checkOperationInProgressFails(client, ccfg, GridNearTxPrepareResponse.class, putOp); http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java index 22d56b0..4c011db 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientReconnectCollectionsTest.java @@ -26,7 +26,7 @@ import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteQueue; import org.apache.ignite.IgniteSet; import org.apache.ignite.configuration.CollectionConfiguration; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponseInterface; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.testframework.GridTestUtils; @@ -316,7 +316,7 @@ public class IgniteClientReconnectCollectionsTest extends IgniteClientReconnectA BlockTpcCommunicationSpi commSpi = commSpi(srv); if (colCfg.getAtomicityMode() == ATOMIC) - commSpi.blockMessage(GridNearAtomicUpdateResponse.class); + commSpi.blockMessage(GridNearAtomicMultipleUpdateResponse.class); else commSpi.blockMessage(GridNearTxPrepareResponse.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java index 358cc58..b56a8a6 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheAtomicMessageCountSelfTest.java @@ -27,10 +27,10 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.managers.communication.GridIoMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicMultipleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicSingleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; @@ -139,9 +139,9 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest TestCommunicationSpi commSpi = (TestCommunicationSpi)grid(0).configuration().getCommunicationSpi(); - commSpi.registerMessage(GridNearAtomicUpdateRequest.class); + commSpi.registerMessage(GridNearAtomicMultipleUpdateRequest.class); commSpi.registerMessage(GridNearAtomicSingleUpdateRequest.class); - commSpi.registerMessage(GridDhtAtomicUpdateRequest.class); + commSpi.registerMessage(GridDhtAtomicMultipleUpdateRequest.class); commSpi.registerMessage(GridDhtAtomicSingleUpdateRequest.class); int putCnt = 15; @@ -201,7 +201,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest * @return Count. */ private int nearRequestsCount(TestCommunicationSpi commSpi) { - return commSpi.messageCount(GridNearAtomicUpdateRequest.class) + + return commSpi.messageCount(GridNearAtomicMultipleUpdateRequest.class) + commSpi.messageCount(GridNearAtomicSingleUpdateRequest.class); } @@ -212,7 +212,7 @@ public class GridCacheAtomicMessageCountSelfTest extends GridCommonAbstractTest * @return Count. */ private int dhtRequestsCount(TestCommunicationSpi commSpi) { - return commSpi.messageCount(GridDhtAtomicUpdateRequest.class) + + return commSpi.messageCount(GridDhtAtomicMultipleUpdateRequest.class) + commSpi.messageCount(GridDhtAtomicSingleUpdateRequest.class); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java index 024ff2f..43a111a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheAtomicStopBusySelfTest.java @@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.cache; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest; /** * Stopped node when client operations are executing. @@ -32,7 +32,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu /** {@inheritDoc} */ @Override public void testPut() throws Exception { - bannedMsg.set(GridNearAtomicUpdateRequest.class); + bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class); bannedMsg.set(GridNearAtomicSingleUpdateRequest.class); super.testPut(); @@ -40,7 +40,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu /** {@inheritDoc} */ @Override public void testPutBatch() throws Exception { - bannedMsg.set(GridNearAtomicUpdateRequest.class); + bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class); bannedMsg.set(GridNearAtomicSingleUpdateRequest.class); super.testPut(); @@ -48,7 +48,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu /** {@inheritDoc} */ @Override public void testPutAsync() throws Exception { - bannedMsg.set(GridNearAtomicUpdateRequest.class); + bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class); bannedMsg.set(GridNearAtomicSingleUpdateRequest.class); super.testPut(); @@ -56,7 +56,7 @@ public class IgniteCacheAtomicStopBusySelfTest extends IgniteCacheAbstractStopBu /** {@inheritDoc} */ @Override public void testRemove() throws Exception { - bannedMsg.set(GridNearAtomicUpdateRequest.class); + bannedMsg.set(GridNearAtomicMultipleUpdateRequest.class); bannedMsg.set(GridNearAtomicSingleUpdateRequest.class); super.testPut(); http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java index 8ecef5d..4733e19 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientNodeChangingTopologyTest.java @@ -62,7 +62,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicSingleUpdateRequest; -import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicMultipleUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequestInterface; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; @@ -233,10 +233,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); // Block messages requests for both nodes. - spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id()); spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id()); - spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite1.localNode().id()); spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite1.localNode().id()); final IgniteCache cache = ignite2.cache(null); @@ -278,7 +278,7 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac map.put(i, i + 1); // Block messages requests for single node. - spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id()); spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id()); putFut = GridTestUtils.runAsync(new Callable() { @@ -366,16 +366,16 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac TestCommunicationSpi spi = (TestCommunicationSpi)ignite3.configuration().getCommunicationSpi(); // Block messages requests for both nodes. - spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id()); spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id()); - spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite1.localNode().id()); spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite1.localNode().id()); - spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite2.localNode().id()); + spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite2.localNode().id()); spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite2.localNode().id()); - spi.record(GridNearAtomicUpdateRequest.class, GridNearAtomicSingleUpdateRequest.class); + spi.record(GridNearAtomicMultipleUpdateRequest.class, GridNearAtomicSingleUpdateRequest.class); final IgniteCache cache = ignite3.cache(null); @@ -469,10 +469,10 @@ public class IgniteCacheClientNodeChangingTopologyTest extends GridCommonAbstrac TestCommunicationSpi spi = (TestCommunicationSpi)ignite2.configuration().getCommunicationSpi(); // Block messages requests for both nodes. - spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite0.localNode().id()); + spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite0.localNode().id()); spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite0.localNode().id()); - spi.blockMessages(GridNearAtomicUpdateRequest.class, ignite1.localNode().id()); + spi.blockMessages(GridNearAtomicMultipleUpdateRequest.class, ignite1.localNode().id()); spi.blockMessages(GridNearAtomicSingleUpdateRequest.class, ignite1.localNode().id()); final IgniteCache cache = ignite2.cache(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/1829f441/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java index b6c89c7..0786b49 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridCacheAtomicInvalidPartitionHandlingSelfTest.java @@ -478,9 +478,9 @@ public class GridCacheAtomicInvalidPartitionHandlingSelfTest extends GridCommonA Object origMsg = msg.message(); return delay && ( - (origMsg instanceof GridNearAtomicUpdateRequest) || + (origMsg instanceof GridNearAtomicMultipleUpdateRequest) || (origMsg instanceof GridNearAtomicSingleUpdateRequest) || - (origMsg instanceof GridDhtAtomicUpdateRequest) || + (origMsg instanceof GridDhtAtomicMultipleUpdateRequest) || (origMsg instanceof GridDhtAtomicSingleUpdateRequest) ); }