ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From voze...@apache.org
Subject [05/34] ignite git commit: Fixing keepBinary for continuous queries.
Date Thu, 26 Nov 2015 10:01:26 GMT
Fixing keepBinary for continuous queries.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01c24e7d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01c24e7d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01c24e7d

Branch: refs/heads/ignite-1956
Commit: 01c24e7d07e15df8a9a4722c0ec2a9366cb2f669
Parents: eeb9142
Author: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Authored: Mon Nov 23 21:40:52 2015 +0300
Committer: Alexey Goncharuk <alexey.goncharuk@gmail.com>
Committed: Mon Nov 23 21:40:52 2015 +0300

----------------------------------------------------------------------
 .../ignite/codegen/MessageCodeGenerator.java    |  1 +
 .../internal/GridEventConsumeHandler.java       |  5 ++
 .../internal/GridMessageListenHandler.java      |  5 ++
 .../processors/cache/IgniteCacheProxy.java      | 14 +++--
 .../continuous/CacheContinuousQueryEntry.java   | 50 ++++++++++++-----
 .../continuous/CacheContinuousQueryEvent.java   |  6 +--
 .../continuous/CacheContinuousQueryHandler.java | 33 +++++++++---
 .../CacheContinuousQueryListener.java           |  5 ++
 .../continuous/CacheContinuousQueryManager.java | 57 ++++++++++++--------
 .../continuous/GridContinuousHandler.java       |  5 ++
 .../continuous/GridContinuousProcessor.java     | 10 +++-
 .../StartRoutineDiscoveryMessage.java           | 13 ++++-
 12 files changed, 156 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
----------------------------------------------------------------------
diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
index 5a31415..8733bb3 100644
--- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
+++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.GridDirectMap;
 import org.apache.ignite.internal.GridDirectTransient;
 import org.apache.ignite.internal.IgniteCodeGeneratingFail;
 import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateRequest;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry;
 import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.apache.ignite.internal.util.typedef.internal.U;

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 3918976..f4bbd6f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -125,6 +125,11 @@ class GridEventConsumeHandler implements GridContinuousHandler {
     }
 
     /** {@inheritDoc} */
+    @Override public boolean keepBinary() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String cacheName() {
         throw new IllegalStateException();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
index aa837b8..9a2829b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java
@@ -98,6 +98,11 @@ public class GridMessageListenHandler implements GridContinuousHandler
{
     }
 
     /** {@inheritDoc} */
+    @Override public boolean keepBinary() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
     @Override public String cacheName() {
         throw new IllegalStateException();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index cb36432..2a52a1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -555,7 +555,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
      * @return Initial iteration cursor.
      */
     @SuppressWarnings("unchecked")
-    private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry,
boolean loc) {
+    private QueryCursor<Cache.Entry<K, V>> queryContinuous(ContinuousQuery qry,
boolean loc, boolean keepBinary) {
         if (qry.getInitialQuery() instanceof ContinuousQuery)
             throw new IgniteException("Initial predicate for continuous query can't be an
instance of another " +
                 "continuous query. Use SCAN or SQL query for initial iteration.");
@@ -570,7 +570,8 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
                 qry.getPageSize(),
                 qry.getTimeInterval(),
                 qry.isAutoUnsubscribe(),
-                loc);
+                loc,
+                keepBinary);
 
             final QueryCursor<Cache.Entry<K, V>> cur =
                 qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null;
@@ -616,8 +617,11 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
 
             validate(qry);
 
+            CacheOperationContext opCtxCall = ctx.operationContextPerCall();
+
             if (qry instanceof ContinuousQuery)
-                return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry,
qry.isLocal());
+                return (QueryCursor<R>)queryContinuous((ContinuousQuery<K, V>)qry,
qry.isLocal(),
+                    opCtxCall != null && opCtxCall.isKeepBinary());
 
             if (qry instanceof SqlQuery) {
                 final SqlQuery p = (SqlQuery)qry;
@@ -1623,7 +1627,9 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K,
V
         CacheOperationContext prev = onEnter(gate, opCtx);
 
         try {
-            ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false);
+            CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+            ctx.continuousQueries().executeJCacheQuery(lsnrCfg, false, opCtx != null &&
opCtx.isKeepBinary());
         }
         catch (IgniteCheckedException e) {
             throw cacheException(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
index 0495e6d..4d3786a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java
@@ -99,6 +99,9 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message
{
     /** Filtered events. */
     private GridLongList filteredEvts;
 
+    /** Keep binary. */
+    private boolean keepBinary;
+
     /**
      * Required by {@link Message}.
      */
@@ -122,6 +125,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
         KeyCacheObject key,
         @Nullable CacheObject newVal,
         @Nullable CacheObject oldVal,
+        boolean keepBinary,
         int part,
         long updateCntr,
         @Nullable AffinityTopologyVersion topVer) {
@@ -133,6 +137,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
         this.part = part;
         this.updateCntr = updateCntr;
         this.topVer = topVer;
+        this.keepBinary = keepBinary;
     }
 
     /**
@@ -203,6 +208,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
     }
 
     /**
+     * @return Keep binary flag.
+     */
+    boolean isKeepBinary() {
+        return keepBinary;
+    }
+
+    /**
      * @param cntrs Filtered events.
      */
     void filteredEvents(GridLongList cntrs) {
@@ -322,36 +334,42 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 writer.incrementState();
 
             case 4:
-                if (!writer.writeMessage("key", key))
+                if (!writer.writeBoolean("keepBinary", keepBinary))
                     return false;
 
                 writer.incrementState();
 
             case 5:
-                if (!writer.writeMessage("newVal", newVal))
+                if (!writer.writeMessage("key", key))
                     return false;
 
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeMessage("oldVal", oldVal))
+                if (!writer.writeMessage("newVal", newVal))
                     return false;
 
                 writer.incrementState();
 
             case 7:
-                if (!writer.writeInt("part", part))
+                if (!writer.writeMessage("oldVal", oldVal))
                     return false;
 
                 writer.incrementState();
 
             case 8:
-                if (!writer.writeMessage("topVer", topVer))
+                if (!writer.writeInt("part", part))
                     return false;
 
                 writer.incrementState();
 
             case 9:
+                if (!writer.writeMessage("topVer", topVer))
+                    return false;
+
+                writer.incrementState();
+
+            case 10:
                 if (!writer.writeLong("updateCntr", updateCntr))
                     return false;
 
@@ -407,7 +425,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 4:
-                key = reader.readMessage("key");
+                keepBinary = reader.readBoolean("keepBinary");
 
                 if (!reader.isLastRead())
                     return false;
@@ -415,7 +433,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 5:
-                newVal = reader.readMessage("newVal");
+                key = reader.readMessage("key");
 
                 if (!reader.isLastRead())
                     return false;
@@ -423,7 +441,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 6:
-                oldVal = reader.readMessage("oldVal");
+                newVal = reader.readMessage("newVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -431,7 +449,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 7:
-                part = reader.readInt("part");
+                oldVal = reader.readMessage("oldVal");
 
                 if (!reader.isLastRead())
                     return false;
@@ -439,7 +457,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 8:
-                topVer = reader.readMessage("topVer");
+                part = reader.readInt("part");
 
                 if (!reader.isLastRead())
                     return false;
@@ -447,6 +465,14 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
                 reader.incrementState();
 
             case 9:
+                topVer = reader.readMessage("topVer");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 10:
                 updateCntr = reader.readLong("updateCntr");
 
                 if (!reader.isLastRead())
@@ -461,11 +487,11 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable,
Message {
 
     /** {@inheritDoc} */
     @Override public byte fieldsCount() {
-        return 10;
+        return 11;
     }
 
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(CacheContinuousQueryEntry.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
index d26be5f..f665339 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEvent.java
@@ -59,18 +59,18 @@ class CacheContinuousQueryEvent<K, V> extends CacheEntryEvent<K,
V> {
     /** {@inheritDoc} */
     @Override
     public K getKey() {
-        return (K)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.key(), true, false);
+        return (K)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.key(), e.isKeepBinary(),
false);
     }
 
     /** {@inheritDoc} */
     @Override public V getValue() {
-        return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.value(), true, false);
+        return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.value(), e.isKeepBinary(),
false);
     }
 
     /** {@inheritDoc} */
     @Override
     public V getOldValue() {
-        return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.oldValue(), true, false);
+        return (V)cctx.cacheObjectContext().unwrapPortableIfNeeded(e.oldValue(), e.isKeepBinary(),
false);
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index b69d4cd..aa9bea3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -81,7 +81,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_QUERY_OBJECT_READ;
 /**
  * Continuous query handler.
  */
-class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
+public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler {
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -128,7 +128,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
     private transient Collection<CacheContinuousQueryEntry> backupQueue;
 
     /** */
-    private boolean localCache;
+    private boolean locCache;
+
+    /** */
+    private transient boolean keepBinary;
 
     /** */
     private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
@@ -180,7 +183,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
         boolean ignoreExpired,
         int taskHash,
         boolean skipPrimaryCheck,
-        boolean locCache) {
+        boolean locCache,
+        boolean keepBinary) {
         assert topic != null;
         assert locLsnr != null;
 
@@ -195,7 +199,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
         this.ignoreExpired = ignoreExpired;
         this.taskHash = taskHash;
         this.skipPrimaryCheck = skipPrimaryCheck;
-        this.localCache = locCache;
+        this.locCache = locCache;
+        this.keepBinary = keepBinary;
 
         cacheId = CU.cacheId(cacheName);
     }
@@ -216,6 +221,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
     }
 
     /** {@inheritDoc} */
+    @Override public boolean keepBinary() {
+        return keepBinary;
+    }
+
+    /**
+     * @param keepBinary Keep binary flag.
+     */
+    public void keepBinary(boolean keepBinary) {
+        this.keepBinary = keepBinary;
+    }
+
+    /** {@inheritDoc} */
     @Override public String cacheName() {
         return cacheName;
     }
@@ -284,6 +301,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
                 }
             }
 
+            /** {@inheritDoc} */
+            @Override public boolean keepBinary() {
+                return keepBinary;
+            }
+
             @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt,
boolean primary,
                 boolean recordIgniteEvt) {
                 if (ignoreExpired && evt.getEventType() == EventType.EXPIRED)
@@ -317,7 +339,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
 
                     if (primary || skipPrimaryCheck) {
                         if (loc) {
-                            if (!localCache) {
+                            if (!locCache) {
                                 Collection<CacheContinuousQueryEntry> entries = handleEvent(ctx,
entry);
 
                                 if (!entries.isEmpty()) {
@@ -848,7 +870,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
{
 
         /**
          * @param e Entry.
-         * @param topVer Topology version.
          * @return Continuous query entry.
          */
         private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index 8342acf..86abbef 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -81,6 +81,11 @@ interface CacheContinuousQueryListener<K, V> {
     public boolean oldValueRequired();
 
     /**
+     * @return Keep binary flag.
+     */
+    public boolean keepBinary();
+
+    /**
      * @return Whether to notify on existing entries.
      */
     public boolean notifyExisting();

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b2e7490..0e4cb40 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -135,7 +135,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
 
         if (cfgs != null) {
             for (CacheEntryListenerConfiguration cfg : cfgs)
-                executeJCacheQuery(cfg, true);
+                executeJCacheQuery(cfg, true, false);
         }
     }
 
@@ -161,21 +161,23 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      */
     public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion
topVer) {
         if (lsnrCnt.get() > 0) {
-            CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
-                cctx.cacheId(),
-                UPDATED,
-                key,
-                null,
-                null,
-                partId,
-                updCntr,
-                topVer);
+            for (CacheContinuousQueryListener lsnr : lsnrs.values()) {
+                CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry(
+                    cctx.cacheId(),
+                    UPDATED,
+                    key,
+                    null,
+                    null,
+                    lsnr.keepBinary(),
+                    partId,
+                    updCntr,
+                    topVer);
 
-            CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
+                CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>(
                     cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0);
 
-            for (CacheContinuousQueryListener lsnr : lsnrs.values())
                 lsnr.skipUpdateEvent(evt, topVer);
+            }
         }
     }
 
@@ -253,6 +255,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 key,
                 newVal,
                 lsnr.oldValueRequired() ? oldVal : null,
+                lsnr.keepBinary(),
                 partId,
                 updateCntr,
                 topVer);
@@ -306,6 +309,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                     key,
                     null,
                     lsnr.oldValueRequired() ? oldVal : null,
+                    lsnr.keepBinary(),
                     e.partition(),
                     -1,
                     null);
@@ -333,7 +337,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         int bufSize,
         long timeInterval,
         boolean autoUnsubscribe,
-        boolean loc) throws IgniteCheckedException
+        boolean loc,
+        boolean keepBinary) throws IgniteCheckedException
     {
         return executeQuery0(
             locLsnr,
@@ -346,7 +351,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             true,
             false,
             true,
-            loc);
+            loc,
+            keepBinary);
     }
 
     /**
@@ -374,7 +380,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             true,
             false,
             true,
-            loc);
+            loc,
+            false);
     }
 
     /**
@@ -395,9 +402,9 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
      * @param onStart Whether listener is created on node start.
      * @throws IgniteCheckedException If failed.
      */
-    public void executeJCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart)
+    public void executeJCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart,
boolean keepBinary)
         throws IgniteCheckedException {
-        JCacheQuery lsnr = new JCacheQuery(cfg, onStart);
+        JCacheQuery lsnr = new JCacheQuery(cfg, onStart, keepBinary);
 
         JCacheQuery old = jCacheLsnrs.putIfAbsent(cfg, lsnr);
 
@@ -471,7 +478,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         boolean oldValRequired,
         boolean sync,
         boolean ignoreExpired,
-        boolean loc) throws IgniteCheckedException
+        boolean loc,
+        final boolean keepBinary) throws IgniteCheckedException
     {
         cctx.checkSecurity(SecurityPermission.CACHE_READ);
 
@@ -492,7 +500,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
             ignoreExpired,
             taskNameHash,
             skipPrimaryCheck,
-            cctx.isLocal());
+            cctx.isLocal(),
+            keepBinary);
 
         IgnitePredicate<ClusterNode> pred = (loc || cctx.config().getCacheMode() ==
CacheMode.LOCAL) ?
             F.nodeForNodeId(cctx.localNodeId()) : F.<ClusterNode>alwaysTrue();
@@ -550,6 +559,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                                     e.key(),
                                     e.rawGet(),
                                     null,
+                                    keepBinary,
                                     0,
                                     -1,
                                     null);
@@ -633,15 +643,19 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
         private final boolean onStart;
 
         /** */
+        private final boolean keepBinary;
+
+        /** */
         private volatile UUID routineId;
 
         /**
          * @param cfg Listener configuration.
          * @param onStart {@code True} if executed on cache start.
          */
-        private JCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart) {
+        private JCacheQuery(CacheEntryListenerConfiguration cfg, boolean onStart, boolean
keepBinary) {
             this.cfg = cfg;
             this.onStart = onStart;
+            this.keepBinary = keepBinary;
         }
 
         /**
@@ -694,7 +708,8 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter
{
                 cfg.isOldValueRequired(),
                 cfg.isSynchronous(),
                 false,
-                false);
+                false,
+                keepBinary);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
index d8698b3..68b83ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java
@@ -143,6 +143,11 @@ public interface GridContinuousHandler extends Externalizable, Cloneable
{
     public boolean isForQuery();
 
     /**
+     * @return {@code True} if Ignite Binary objects should be passed to the listener and
filter.
+     */
+    public boolean keepBinary();
+
+    /**
      * @return Cache name if this is a continuous query handler.
      */
     public String cacheName();

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index e218790..f473d02 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -612,7 +613,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             if (locIncluded && registerHandler(ctx.localNodeId(), routineId, hnd,
bufSize, interval, autoUnsubscribe, true))
                 hnd.onListenerRegistered(routineId, ctx);
 
-            ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData));
+            ctx.discovery().sendCustomEvent(new StartRoutineDiscoveryMessage(routineId, reqData,
+                reqData.handler().keepBinary()));
         }
         catch (IgniteCheckedException e) {
             startFuts.remove(routineId);
@@ -822,6 +824,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         GridContinuousHandler hnd = data.handler();
 
+        if (req.keepBinary()) {
+            assert hnd instanceof CacheContinuousQueryHandler;
+
+            ((CacheContinuousQueryHandler)hnd).keepBinary(true);
+        }
+
         IgniteCheckedException err = null;
 
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/01c24e7d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
index 82c0377..ff037d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java
@@ -40,14 +40,18 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage
{
     /** */
     private Map<Integer, Long> updateCntrs;
 
+    /** Keep binary flag. */
+    private boolean keepBinary;
+
     /**
      * @param routineId Routine id.
      * @param startReqData Start request data.
      */
-    public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData) {
+    public StartRoutineDiscoveryMessage(UUID routineId, StartRequestData startReqData, boolean
keepBinary) {
         super(routineId);
 
         this.startReqData = startReqData;
+        this.keepBinary = keepBinary;
     }
 
     /**
@@ -88,6 +92,13 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage
{
         return errs;
     }
 
+    /**
+     * @return {@code True} if keep binary flag was set on continuous handler.
+     */
+    public boolean keepBinary() {
+        return keepBinary;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean isMutable() {
         return true;


Mime
View raw message