ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [1/2] ignite git commit: Reworked filter in single update - it is no longer serialized as is.
Date Wed, 20 Apr 2016 08:20:07 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-2523-1 586ac9a75 -> f9c692c81


Reworked filter in single update - it is no longer serialized as is.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b674fe9e
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b674fe9e
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b674fe9e

Branch: refs/heads/ignite-2523-1
Commit: b674fe9ef62eec47d175e03cf787ee72bfff6c6a
Parents: 586ac9a
Author: vozerov-gridgain <vozerov@gridgain.com>
Authored: Wed Apr 20 10:44:12 2016 +0300
Committer: vozerov-gridgain <vozerov@gridgain.com>
Committed: Wed Apr 20 10:44:12 2016 +0300

----------------------------------------------------------------------
 .../cache/CacheEntryPredicateContainsValue.java |   7 ++
 .../processors/cache/CacheOperationFilter.java  |  11 ++
 .../dht/atomic/GridDhtAtomicCache.java          |   3 +-
 .../GridNearAtomicAbstractUpdateFuture.java     |   7 --
 .../GridNearAtomicSingleUpdateFuture.java       |  44 ++++++-
 .../GridNearAtomicSingleUpdateRequest.java      | 121 ++++++++++---------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  |   7 +-
 7 files changed, 128 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
index 3db8ae8..7a1ee8d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheEntryPredicateContainsValue.java
@@ -66,6 +66,13 @@ public class CacheEntryPredicateContainsValue extends CacheEntryPredicateAdapter
         return F.eq(thisVal, cacheVal);
     }
 
+    /**
+     * @return Value.
+     */
+    public CacheObject value() {
+        return val;
+    }
+
     /** {@inheritDoc} */
     @Override public void finishUnmarshal(GridCacheContext ctx, ClassLoader ldr) throws IgniteCheckedException
{
         val.finishUnmarshal(ctx.cacheObjectContext(), ldr);

http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java
index 7fdfaac..ccd8863 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOperationFilter.java
@@ -35,6 +35,17 @@ public enum CacheOperationFilter {
     /** Equals to value. */
     EQUALS_VAL;
 
+    /** Enum values. */
+    private static final CacheOperationFilter[] VALS = values();
+
+    /**
+     * @param ord Ordinal value.
+     * @return Enum value.
+     */
+    @Nullable public static CacheOperationFilter fromOrdinal(int ord) {
+        return ord < 0 || ord >= VALS.length ? null : VALS[ord];
+    }
+
     /**
      * Creare predicate from operation filter.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index cbda827..f4d9aa8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1098,6 +1098,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                 conflictRmvVer = ctx.versions().next(dcId);
         }
 
+        // TODO: Optimize - no array allocs!
         CacheEntryPredicate[] filters = CU.filterArray(filter);
 
         if (conflictPutVal == null && conflictRmvVer == null && !isFastMap(filters,
op)) {
@@ -1112,7 +1113,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K,
V> {
                 retval,
                 false,
                 opCtx != null ? opCtx.expiry() : null,
-                filters,
+                filter,
                 ctx.subjectIdPerCall(null, opCtx),
                 ctx.kernalContext().job().currentTaskNameHash(),
                 opCtx != null && opCtx.skipStore(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
index 7f52299..5cbe72a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
@@ -76,9 +75,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
     /** Expiry policy. */
     protected final ExpiryPolicy expiryPlc;
 
-    /** Optional filter. */
-    protected final CacheEntryPredicate[] filter;
-
     /** Subject ID. */
     protected final UUID subjId;
 
@@ -141,7 +137,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
      * @param retval Return value flag.
      * @param rawRetval Raw return value flag.
      * @param expiryPlc Expiry policy.
-     * @param filter Filter.
      * @param subjId Subject ID.
      * @param taskNameHash Task name hash.
      * @param skipStore Skip store flag.
@@ -158,7 +153,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         boolean retval,
         boolean rawRetval,
         @Nullable ExpiryPolicy expiryPlc,
-        CacheEntryPredicate[] filter,
         UUID subjId,
         int taskNameHash,
         boolean skipStore,
@@ -177,7 +171,6 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt
         this.retval = retval;
         this.rawRetval = rawRetval;
         this.expiryPlc = expiryPlc;
-        this.filter = filter;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
index 2bebc1d..196de69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java
@@ -25,6 +25,12 @@ import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicateContainsValue;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicateHasValue;
+import org.apache.ignite.internal.processors.cache.CacheEntryPredicateNoValue;
+import org.apache.ignite.internal.processors.cache.CacheEntrySerializablePredicate;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheOperationFilter;
 import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
@@ -60,6 +66,9 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRA
  * DHT atomic cache near update future.
  */
 public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture
{
+    /** Optional filter. */
+    private final CacheEntryPredicate filter;
+
     /** Keys */
     private Object key;
 
@@ -100,7 +109,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         final boolean retval,
         final boolean rawRetval,
         @Nullable ExpiryPolicy expiryPlc,
-        final CacheEntryPredicate[] filter,
+        CacheEntryPredicate filter,
         UUID subjId,
         int taskNameHash,
         boolean skipStore,
@@ -108,11 +117,13 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         int remapCnt,
         boolean waitTopFut
     ) {
-        super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter,
subjId, taskNameHash,
+        super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, subjId,
taskNameHash,
             skipStore, keepBinary, remapCnt, waitTopFut);
 
         assert subjId != null;
 
+        this.filter = filter;
+
         this.key = key;
         this.val = val;
     }
@@ -624,6 +635,30 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
         GridNearAtomicAbstractUpdateRequest req;
 
         if (single) {
+            // TODO: Refactor that?
+            CacheOperationFilter filter0;
+            CacheObject filterVal = null;
+
+            if (filter == null)
+                filter0 = CacheOperationFilter.ALWAYS;
+            else {
+                if (filter instanceof CacheEntrySerializablePredicate) {
+                    CacheEntryPredicate pred = ((CacheEntrySerializablePredicate)filter).predicate();
+
+                    if (pred instanceof CacheEntryPredicateHasValue)
+                        filter0 = CacheOperationFilter.HAS_VAL;
+                    else {
+                        assert pred instanceof CacheEntryPredicateNoValue;
+
+                        filter0 = CacheOperationFilter.NO_VAL;
+                    }
+                }
+                else {
+                    filter0 = CacheOperationFilter.EQUALS_VAL;
+                    filterVal = ((CacheEntryPredicateContainsValue)filter).value();
+                }
+            }
+
             req = new GridNearAtomicSingleUpdateRequest(
                 cctx.cacheId(),
                 primary.id(),
@@ -637,7 +672,8 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 retval,
                 expiryPlc,
                 invokeArgs,
-                filter,
+                filter0,
+                filterVal,
                 subjId,
                 taskNameHash,
                 skipStore,
@@ -661,7 +697,7 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda
                 retval,
                 expiryPlc,
                 invokeArgs,
-                filter,
+                CU.filterArray(filter),
                 subjId,
                 taskNameHash,
                 skipStore,

http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
index 0a19eb4..efef7d2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateRequest.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
 import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheOperationFilter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -30,7 +31,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -44,7 +44,6 @@ import javax.cache.expiry.ExpiryPolicy;
 import javax.cache.processor.EntryProcessor;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -116,7 +115,10 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
     private byte[] expiryPlcBytes;
 
     /** Filter. */
-    private CacheEntryPredicate[] filter;
+    private CacheOperationFilter filter;
+
+    /** Filter value (expected value). */
+    private CacheObject filterVal;
 
     /** Subject ID. */
     private UUID subjId;
@@ -183,7 +185,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
         boolean retval,
         @Nullable ExpiryPolicy expiryPlc,
         @Nullable Object[] invokeArgs,
-        @Nullable CacheEntryPredicate[] filter,
+        CacheOperationFilter filter,
+        @Nullable CacheObject filterVal,
         @Nullable UUID subjId,
         int taskNameHash,
         boolean skipStore,
@@ -209,6 +212,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
         this.expiryPlc = expiryPlc;
         this.invokeArgs = invokeArgs;
         this.filter = filter;
+        this.filterVal = filterVal;
         this.subjId = subjId;
         this.taskNameHash = taskNameHash;
         this.skipStore = skipStore;
@@ -305,12 +309,13 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
     /** {@inheritDoc} */
     @Override @Nullable public CacheEntryPredicate[] filter() {
-        return filter;
+        // TODO: Optimzie - no allocs!
+        return CU.filterArray(filter.createPredicate(filterVal));
     }
 
     /** {@inheritDoc} */
     @Override public boolean hasFilter() {
-        return !F.isEmpty(filter);
+        return filter != null;
     }
 
     /** {@inheritDoc} */
@@ -421,21 +426,6 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
         prepareMarshalCacheObject(key, cctx);
 
-        if (filter != null) {
-            boolean hasFilter = false;
-
-            for (CacheEntryPredicate p : filter) {
-                if (p != null) {
-                    hasFilter = true;
-
-                    p.prepareMarshal(cctx);
-                }
-            }
-
-            if (!hasFilter)
-                filter = null;
-        }
-
         if (expiryPlc != null && expiryPlcBytes == null)
             expiryPlcBytes = CU.marshal(cctx, new IgniteExternalizableExpiryPolicy(expiryPlc));
 
@@ -452,6 +442,8 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
         }
         else
             prepareMarshalCacheObject(val, cctx);
+
+        prepareMarshalCacheObject(filterVal, cctx);
     }
 
     /** {@inheritDoc} */
@@ -472,12 +464,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
         else
             finishUnmarshalCacheObject(val, cctx, ldr);
 
-        if (filter != null) {
-            for (CacheEntryPredicate p : filter) {
-                if (p != null)
-                    p.finishUnmarshal(cctx, ldr);
-            }
-        }
+        finishUnmarshalCacheObject(filterVal, cctx, ldr);
 
         if (expiryPlcBytes != null && expiryPlc == null)
             expiryPlc = ctx.marshaller().unmarshal(expiryPlcBytes, U.resolveClassLoader(ldr,
ctx.gridConfig()));
@@ -528,95 +515,100 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeObjectArray("filter", filter, MessageCollectionItemType.MSG))
+                if (!writer.writeByte("filter", (byte)op.ordinal()))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeMessage("futVer", futVer))
+                if (!writer.writeMessage("filterVal", filterVal))
                     return false;
 
                 writer.incrementState();
 
             case 9:
-                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
+                if (!writer.writeMessage("futVer", futVer))
                     return false;
 
                 writer.incrementState();
 
             case 10:
-                if (!writer.writeBoolean("keepBinary", keepBinary))
+                if (!writer.writeObjectArray("invokeArgsBytes", invokeArgsBytes, MessageCollectionItemType.BYTE_ARR))
                     return false;
 
                 writer.incrementState();
 
             case 11:
-                if (!writer.writeMessage("key", key))
+                if (!writer.writeBoolean("keepBinary", keepBinary))
                     return false;
 
                 writer.incrementState();
 
             case 12:
-                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
+                if (!writer.writeMessage("key", key))
                     return false;
 
                 writer.incrementState();
 
             case 13:
-                if (!writer.writeBoolean("retval", retval))
+                if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1))
                     return false;
 
                 writer.incrementState();
 
             case 14:
-                if (!writer.writeBoolean("skipStore", skipStore))
+                if (!writer.writeBoolean("retval", retval))
                     return false;
 
                 writer.incrementState();
 
             case 15:
-                if (!writer.writeUuid("subjId", subjId))
+                if (!writer.writeBoolean("skipStore", skipStore))
                     return false;
 
                 writer.incrementState();
 
             case 16:
-                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal()
: -1))
+                if (!writer.writeUuid("subjId", subjId))
                     return false;
 
                 writer.incrementState();
 
             case 17:
-                if (!writer.writeInt("taskNameHash", taskNameHash))
+                if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal()
: -1))
                     return false;
 
                 writer.incrementState();
 
             case 18:
-                if (!writer.writeBoolean("topLocked", topLocked))
+                if (!writer.writeInt("taskNameHash", taskNameHash))
                     return false;
 
                 writer.incrementState();
 
             case 19:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeBoolean("topLocked", topLocked))
                     return false;
 
                 writer.incrementState();
 
             case 20:
-                if (!writer.writeMessage("updateVer", updateVer))
+                if (!writer.writeMessage("topVer", topVer))
                     return false;
 
                 writer.incrementState();
 
             case 21:
-                if (!writer.writeMessage("val", val))
+                if (!writer.writeMessage("updateVer", updateVer))
                     return false;
 
                 writer.incrementState();
 
+            case 22:
+                if (!writer.writeMessage("val", val))
+                    return false;
+
+                writer.incrementState();
         }
 
         return true;
@@ -666,15 +658,19 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
                 reader.incrementState();
 
             case 7:
-                filter = reader.readObjectArray("filter", MessageCollectionItemType.MSG,
CacheEntryPredicate.class);
+                byte filterOrd;
+
+                filterOrd = reader.readByte("filter");
 
                 if (!reader.isLastRead())
                     return false;
 
+                filter = CacheOperationFilter.fromOrdinal(filterOrd);
+
                 reader.incrementState();
 
             case 8:
-                futVer = reader.readMessage("futVer");
+                filterVal = reader.readMessage("filterVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -682,7 +678,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
                 reader.incrementState();
 
             case 9:
-                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR,
byte[].class);
+                futVer = reader.readMessage("futVer");
 
                 if (!reader.isLastRead())
                     return false;
@@ -690,7 +686,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
                 reader.incrementState();
 
             case 10:
-                keepBinary = reader.readBoolean("keepBinary");
+                invokeArgsBytes = reader.readObjectArray("invokeArgsBytes", MessageCollectionItemType.BYTE_ARR,
byte[].class);
 
                 if (!reader.isLastRead())
                     return false;
@@ -698,7 +694,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
                 reader.incrementState();
 
             case 11:
-                key = reader.readMessage("key");
+                keepBinary = reader.readBoolean("keepBinary");
 
                 if (!reader.isLastRead())
                     return false;
@@ -706,6 +702,14 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
                 reader.incrementState();
 
             case 12:
+                key = reader.readMessage("key");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 13:
                 byte opOrd;
 
                 opOrd = reader.readByte("op");
@@ -717,7 +721,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
                 reader.incrementState();
 
-            case 13:
+            case 14:
                 retval = reader.readBoolean("retval");
 
                 if (!reader.isLastRead())
@@ -725,7 +729,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
                 reader.incrementState();
 
-            case 14:
+            case 15:
                 skipStore = reader.readBoolean("skipStore");
 
                 if (!reader.isLastRead())
@@ -733,7 +737,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
                 reader.incrementState();
 
-            case 15:
+            case 16:
                 subjId = reader.readUuid("subjId");
 
                 if (!reader.isLastRead())
@@ -741,7 +745,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
                 reader.incrementState();
 
-            case 16:
+            case 17:
                 byte syncModeOrd;
 
                 syncModeOrd = reader.readByte("syncMode");
@@ -753,7 +757,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
                 reader.incrementState();
 
-            case 17:
+            case 18:
                 taskNameHash = reader.readInt("taskNameHash");
 
                 if (!reader.isLastRead())
@@ -761,7 +765,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
                 reader.incrementState();
 
-            case 18:
+            case 19:
                 topLocked = reader.readBoolean("topLocked");
 
                 if (!reader.isLastRead())
@@ -769,7 +773,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
                 reader.incrementState();
 
-            case 19:
+            case 20:
                 topVer = reader.readMessage("topVer");
 
                 if (!reader.isLastRead())
@@ -777,7 +781,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
                 reader.incrementState();
 
-            case 20:
+            case 21:
                 updateVer = reader.readMessage("updateVer");
 
                 if (!reader.isLastRead())
@@ -785,7 +789,7 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
                 reader.incrementState();
 
-            case 21:
+            case 22:
                 val = reader.readMessage("val");
 
                 if (!reader.isLastRead())
@@ -815,12 +819,11 @@ public class GridNearAtomicSingleUpdateRequest extends GridNearAtomicAbstractUpd
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 22;
+        return 23;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
-        return S.toString(GridNearAtomicSingleUpdateRequest.class, this, "filter", Arrays.toString(filter),
-            "parent", super.toString());
+        return S.toString(GridNearAtomicSingleUpdateRequest.class, this, "parent", super.toString());
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b674fe9e/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index edebd8c..7021e2f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -67,6 +67,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
     /** Fast map flag. */
     private final boolean fastMap;
 
+    /** Optional filter. */
+    private final CacheEntryPredicate[] filter;
+
     /** Keys */
     private Collection<?> keys;
 
@@ -134,7 +137,7 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         int remapCnt,
         boolean waitTopFut
     ) {
-        super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter,
subjId, taskNameHash,
+        super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, subjId,
taskNameHash,
             skipStore, keepBinary, remapCnt, waitTopFut);
 
         assert vals == null || vals.size() == keys.size();
@@ -142,6 +145,8 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu
         assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
         assert subjId != null;
 
+        this.filter = filter;
+
         this.keys = keys;
         this.vals = vals;
         this.conflictPutVals = conflictPutVals;


Mime
View raw message