ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [17/50] [abbrv] ignite git commit: IGNITE-3575 CPP: Added support for continuous queries remote filters.
Date Thu, 06 Apr 2017 13:31:24 GMT
IGNITE-3575 CPP: Added support for continuous queries remote filters.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d4da92b7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d4da92b7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d4da92b7

Branch: refs/heads/ignite-2893
Commit: d4da92b7ab2625c1e09d8ae06bf1eb9393162c8e
Parents: 79bac4f
Author: Igor Sapego <isapego@gridgain.com>
Authored: Fri Mar 31 16:40:25 2017 +0300
Committer: Igor Sapego <isapego@gridgain.com>
Committed: Fri Mar 31 16:40:25 2017 +0300

----------------------------------------------------------------------
 .../ignite/impl/binary/binary_type_impl.h       |   2 +-
 .../common/include/ignite/common/concurrent.h   |  30 +++
 .../cpp/common/include/ignite/reference.h       |  14 +-
 .../cpp/core-test/config/cache-test.xml         |   2 +-
 .../project/vs/core-test.vcxproj.filters        |   3 +
 .../cpp/core-test/src/cache_invoke_test.cpp     |   6 +-
 .../platforms/cpp/core-test/src/cache_test.cpp  |  23 ++-
 .../cpp/core-test/src/continuous_query_test.cpp | 202 ++++++++++++++++++-
 .../cpp/core-test/src/reference_test.cpp        |  12 +-
 modules/platforms/cpp/core/Makefile.am          |   1 +
 modules/platforms/cpp/core/include/Makefile.am  |  70 ++++---
 .../cpp/core/include/ignite/cache/cache.h       |  22 +-
 .../ignite/cache/cache_entry_processor.h        |  42 +---
 .../cache/event/cache_entry_event_filter.h      | 109 ++++++++++
 .../cache/query/continuous/continuous_query.h   |  35 +++-
 .../cpp/core/include/ignite/ignite_binding.h    |  39 +++-
 .../include/ignite/ignite_binding_context.h     |   2 +-
 .../cpp/core/include/ignite/impl/bindings.h     |  95 +++++++++
 .../impl/cache/cache_entry_processor_holder.h   |  15 --
 .../core/include/ignite/impl/cache/cache_impl.h |  81 +-------
 .../cache/event/cache_entry_event_filter_base.h |  66 ++++++
 .../event/cache_entry_event_filter_holder.h     | 185 +++++++++++++++++
 .../continuous/continuous_query_handle_impl.h   |  10 -
 .../query/continuous/continuous_query_impl.h    |  60 +++++-
 .../include/ignite/impl/ignite_binding_impl.h   | 101 +++++-----
 .../include/ignite/impl/ignite_environment.h    |  37 ++--
 .../cpp/core/include/ignite/impl/ignite_impl.h  |  10 +-
 .../cpp/core/include/ignite/impl/operations.h   |   2 +-
 .../platforms/cpp/core/project/vs/core.vcxproj  |   5 +
 .../cpp/core/project/vs/core.vcxproj.filters    |  18 ++
 .../cpp/core/src/impl/cache/cache_impl.cpp      |  90 ++++++++-
 .../continuous/continuous_query_handle_impl.cpp |   5 -
 .../cpp/core/src/impl/ignite_binding_impl.cpp   |  88 ++++++++
 .../cpp/core/src/impl/ignite_environment.cpp    | 124 ++++++++++--
 .../platforms/cpp/core/src/impl/ignite_impl.cpp |   2 +-
 35 files changed, 1284 insertions(+), 324 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h
index d0cbb86..08c60c0 100644
--- a/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h
+++ b/modules/platforms/cpp/binary/include/ignite/impl/binary/binary_type_impl.h
@@ -103,7 +103,7 @@ namespace ignite
                     ignite::binary::BinaryType<T> bt;
                     ignite::Reference<ignite::binary::BinaryIdentityResolver> resolver = bt.GetIdentityResolver();
 
-                    return resolver.Get().GetHashCode(obj);
+                    return resolver.Get()->GetHashCode(obj);
                 }
             };
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/common/include/ignite/common/concurrent.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/ignite/common/concurrent.h b/modules/platforms/cpp/common/include/ignite/common/concurrent.h
index 84a1f0e..69b8eda 100644
--- a/modules/platforms/cpp/common/include/ignite/common/concurrent.h
+++ b/modules/platforms/cpp/common/include/ignite/common/concurrent.h
@@ -30,6 +30,11 @@ namespace ignite
         namespace concurrent
         {
             /**
+             * Type tag for static pointer cast.
+             */
+            struct StaticTag {};
+
+            /**
              * Default deleter implementation.
              *
              * @param obj Object to be deleted.
@@ -198,6 +203,20 @@ namespace ignite
                 }
 
                 /**
+                 * Static-cast constructor.
+                 *
+                 * @param other Instance to copy.
+                 */
+                template<typename T2>
+                SharedPointer(const SharedPointer<T2>& other, StaticTag) :
+                    ptr(static_cast<T*>(other.ptr)),
+                    impl(other.impl)
+                {
+                    if (impl)
+                        impl->Increment();
+                }
+
+                /**
                  * Assignment operator.
                  *
                  * @param other Other instance.
@@ -313,6 +332,17 @@ namespace ignite
             };
 
             /**
+             * Enables static-cast semantics for SharedPointer.
+             *
+             * @param val Value to cast.
+             */
+            template<class T1, class T2>
+            SharedPointer<T1> StaticPointerCast(const SharedPointer<T2>& val)
+            {
+                return SharedPointer<T1>(val, StaticTag());
+            }
+
+            /**
              * The class provides functionality that allows objects of derived
              * classes to create instances of shared_ptr pointing to themselves
              * and sharing ownership with existing shared_ptr objects.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/common/include/ignite/reference.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/common/include/ignite/reference.h b/modules/platforms/cpp/common/include/ignite/reference.h
index b026ad7..08cccec 100644
--- a/modules/platforms/cpp/common/include/ignite/reference.h
+++ b/modules/platforms/cpp/common/include/ignite/reference.h
@@ -160,9 +160,9 @@ namespace ignite
          *
          * @return Constant reference to underlying value.
          */
-        const T& Get() const
+        const T* Get() const
         {
-            return *reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
+            return reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
         }
 
         /**
@@ -326,11 +326,11 @@ namespace ignite
          * If the pointer is null then this operation causes undefined
          * behaviour.
          *
-         * @return Constant reference to underlying value.
+         * @return Constant pointer to underlying value.
          */
-        const T& Get() const
+        const T* Get() const
         {
-            return *reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
+            return reinterpret_cast<const T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
         }
 
         /**
@@ -341,9 +341,9 @@ namespace ignite
          *
          * @return Reference to underlying value.
          */
-        T& Get()
+        T* Get()
         {
-            return *reinterpret_cast<T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
+            return reinterpret_cast<T*>(reinterpret_cast<ptrdiff_t>(ptr.Get()->Get()) + offset);
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/config/cache-test.xml
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/config/cache-test.xml b/modules/platforms/cpp/core-test/config/cache-test.xml
index 0ea5876..10300ba 100644
--- a/modules/platforms/cpp/core-test/config/cache-test.xml
+++ b/modules/platforms/cpp/core-test/config/cache-test.xml
@@ -55,7 +55,7 @@
                     <property name="cacheMode" value="PARTITIONED"/>
                     <property name="atomicityMode" value="TRANSACTIONAL"/>
                 </bean>
-				
+
                 <bean parent="cache-template">
                     <property name="name" value="partitioned2"/>
                     <property name="cacheMode" value="PARTITIONED"/>

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
index fb0be1b..5181f96 100644
--- a/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
+++ b/modules/platforms/cpp/core-test/project/vs/core-test.vcxproj.filters
@@ -142,5 +142,8 @@
     <None Include="..\..\config\cache-store.xml">
       <Filter>Configs</Filter>
     </None>
+    <None Include="..\..\config\cache-query-continuous.xml">
+      <Filter>Configs</Filter>
+    </None>
   </ItemGroup>
 </Project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp b/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp
index db304e2..4f1f30a 100644
--- a/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp
+++ b/modules/platforms/cpp/core-test/src/cache_invoke_test.cpp
@@ -44,7 +44,7 @@ using namespace ignite::common;
 /**
  * CacheEntryModifier class for invoke tests.
  */
-class CacheEntryModifier : public CacheEntryProcessor<CacheEntryModifier, int, int, int, int>
+class CacheEntryModifier : public CacheEntryProcessor<int, int, int, int>
 {
 public:
     /**
@@ -151,7 +151,7 @@ namespace ignite
 /**
  * Divisor class for invoke tests.
  */
-class Divisor : public CacheEntryProcessor<Divisor, int, int, double, double>
+class Divisor : public CacheEntryProcessor<int, int, double, double>
 {
 public:
     /**
@@ -262,7 +262,7 @@ namespace ignite
 /**
  * Character remover class for invoke tests.
  */
-class CharRemover : public CacheEntryProcessor<CharRemover, std::string, std::string, int, bool>
+class CharRemover : public CacheEntryProcessor<std::string, std::string, int, bool>
 {
 public:
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/cache_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/cache_test.cpp b/modules/platforms/cpp/core-test/src/cache_test.cpp
index 437ed234..d57b757 100644
--- a/modules/platforms/cpp/core-test/src/cache_test.cpp
+++ b/modules/platforms/cpp/core-test/src/cache_test.cpp
@@ -29,16 +29,6 @@
 using namespace ignite;
 using namespace boost::unit_test;
 
-/* Nodes started during the test. */
-Ignite grid0 = Ignite();
-Ignite grid1 = Ignite();
-
-/** Cache accessor. */
-cache::Cache<int, int> Cache()
-{
-    return grid0.GetCache<int, int>("partitioned");
-}
-
 struct Person
 {
     std::string name;
@@ -88,7 +78,18 @@ namespace ignite
 /*
  * Test setup fixture.
  */
-struct CacheTestSuiteFixture {
+struct CacheTestSuiteFixture
+{
+    /* Nodes started during the test. */
+    Ignite grid0;
+    Ignite grid1;
+
+    /** Cache accessor. */
+    cache::Cache<int, int> Cache()
+    {
+        return grid0.GetCache<int, int>("partitioned");
+    }
+
     /*
      * Constructor.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/continuous_query_test.cpp b/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
index 1be21c1..f81eb5d 100644
--- a/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
+++ b/modules/platforms/cpp/core-test/src/continuous_query_test.cpp
@@ -175,6 +175,61 @@ private:
     ConcurrentQueue< CacheEntryEvent<K, V> > eventQueue;
 };
 
+/**
+ * Only lets through keys from the range.
+ */
+template<typename K, typename V>
+struct RangeFilter : CacheEntryEventFilter<K, V>
+{
+    /**
+     * Default constructor.
+     */
+    RangeFilter() :
+        rangeBegin(0),
+        rangeEnd(0)
+    {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param from Range beginning. Inclusive.
+     * @param to Range end. Not inclusive.
+     */
+    RangeFilter(const K& from, const K& to) :
+        rangeBegin(from),
+        rangeEnd(to)
+    {
+        // No-op.
+    }
+
+    /**
+     * Destructor.
+     */
+    virtual ~RangeFilter()
+    {
+        // No-op.
+    }
+
+    /**
+     * Event callback.
+     *
+     * @param event Event.
+     * @return True if the event passes filter.
+     */
+    virtual bool Process(const CacheEntryEvent<K, V>& event)
+    {
+        return event.GetKey() >= rangeBegin && event.GetKey() < rangeEnd;
+    }
+
+    /** Beginning of the range. */
+    K rangeBegin;
+
+    /** End of the range. */
+    K rangeEnd;
+};
+
 /*
  * Test entry.
  */
@@ -204,10 +259,9 @@ namespace ignite
 {
     namespace binary
     {
-        /**
-        * Binary type definition.
-        */
-        IGNITE_BINARY_TYPE_START(TestEntry)
+        template<>
+        struct BinaryType<TestEntry>
+        {
             IGNITE_BINARY_GET_TYPE_ID_AS_HASH(TestEntry)
             IGNITE_BINARY_GET_TYPE_NAME_AS_IS(TestEntry)
             IGNITE_BINARY_GET_FIELD_ID_AS_HASH
@@ -227,8 +281,52 @@ namespace ignite
 
                 return res;
             }
+        };
 
-        IGNITE_BINARY_TYPE_END
+        template<typename K, typename V>
+        struct BinaryType< RangeFilter<K,V> >
+        {
+            int32_t GetTypeId()
+            {
+                return GetBinaryStringHashCode("RangeFilter");
+            }
+
+            std::string GetTypeName()
+            {
+                return "RangeFilter";
+
+            }
+            IGNITE_BINARY_GET_FIELD_ID_AS_HASH
+
+            int32_t GetHashCode(const RangeFilter<K,V>&)
+            {
+                return 0;
+            }
+
+            bool IsNull(const RangeFilter<K,V>&)
+            {
+                return false;
+            }
+
+            RangeFilter<K,V> GetNull()
+            {
+                return RangeFilter<K,V>();
+            }
+
+            void Write(BinaryWriter& writer, const RangeFilter<K,V>& obj)
+            {
+                writer.WriteObject("rangeBegin", obj.rangeBegin);
+                writer.WriteObject("rangeEnd", obj.rangeEnd);
+            }
+
+            RangeFilter<K,V> Read(BinaryReader& reader)
+            {
+                K begin = reader.ReadObject<K>("rangeBegin");
+                K end = reader.ReadObject<K>("rangeEnd");
+
+                return RangeFilter<K,V>(begin, end);
+            }
+        };
     }
 }
 
@@ -237,7 +335,7 @@ namespace ignite
  */
 struct ContinuousQueryTestSuiteFixture
 {
-    Ignite grid;
+    Ignite node;
 
     Cache<int, TestEntry> cache;
 
@@ -245,8 +343,8 @@ struct ContinuousQueryTestSuiteFixture
      * Constructor.
      */
     ContinuousQueryTestSuiteFixture() :
-        grid(ignite_test::StartNode("cache-query-continuous.xml", "node-01")),
-        cache(grid.GetCache<int, TestEntry>("transactional_no_backup"))
+        node(ignite_test::StartNode("cache-query-continuous.xml", "node-01")),
+        cache(node.GetCache<int, TestEntry>("transactional_no_backup"))
     {
         // No-op.
     }
@@ -258,7 +356,7 @@ struct ContinuousQueryTestSuiteFixture
     {
         Ignition::StopAll(false);
 
-        grid = Ignite();
+        node = Ignite();
     }
 };
 
@@ -581,4 +679,90 @@ BOOST_AUTO_TEST_CASE(TestPublicPrivateConstantsConsistence)
         static_cast<int>(QueryType::DEFAULT_BUFFER_SIZE));
 }
 
+BOOST_AUTO_TEST_CASE(TestFilterSingleNode)
+{
+    node.GetBinding().RegisterCacheEntryEventFilter< RangeFilter<int, TestEntry> >();
+
+    Listener<int, TestEntry> lsnr;
+    RangeFilter<int, TestEntry> filter(100, 150);
+
+    ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr), MakeReference(filter));
+
+    ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry);
+
+    cache.Put(1, TestEntry(10));
+    cache.Put(1, TestEntry(11));
+
+    cache.Put(2, TestEntry(20));
+    cache.Remove(2);
+
+    cache.Put(100, TestEntry(1000));
+    cache.Put(101, TestEntry(1010));
+
+    cache.Put(142, TestEntry(1420));
+    cache.Put(142, TestEntry(1421));
+    cache.Remove(142);
+
+    cache.Put(149, TestEntry(1490));
+    cache.Put(150, TestEntry(1500));
+    cache.Put(150, TestEntry(1502));
+    cache.Remove(150);
+
+    lsnr.CheckNextEvent(100, boost::none, TestEntry(1000));
+    lsnr.CheckNextEvent(101, boost::none, TestEntry(1010));
+
+    lsnr.CheckNextEvent(142, boost::none, TestEntry(1420));
+    lsnr.CheckNextEvent(142, TestEntry(1420), TestEntry(1421));
+    lsnr.CheckNextEvent(142, TestEntry(1421), boost::none);
+
+    lsnr.CheckNextEvent(149, boost::none, TestEntry(1490));
+}
+
+BOOST_AUTO_TEST_CASE(TestFilterMultipleNodes)
+{
+    Ignite node2 = ignite_test::StartNode("cache-query-continuous.xml", "node-02");
+    Ignite node3 = ignite_test::StartNode("cache-query-continuous.xml", "node-03");
+
+    node.GetBinding().RegisterCacheEntryEventFilter< RangeFilter<int, TestEntry> >();
+
+    Listener<int, TestEntry> lsnr;
+    RangeFilter<int, TestEntry> filter(100, 150);
+
+    ContinuousQuery<int, TestEntry> qry(MakeReference(lsnr), MakeReference(filter));
+
+    ContinuousQueryHandle<int, TestEntry> handle = cache.QueryContinuous(qry);
+
+    Cache<int, TestEntry> cache2 = node2.GetCache<int, TestEntry>("transactional_no_backup");
+
+    cache2.Put(1, TestEntry(10));
+    cache2.Put(1, TestEntry(11));
+
+    cache2.Put(2, TestEntry(20));
+    cache2.Remove(2);
+
+    cache2.Put(100, TestEntry(1000));
+    cache2.Put(101, TestEntry(1010));
+
+    cache2.Put(142, TestEntry(1420));
+    cache2.Put(142, TestEntry(1421));
+    cache2.Remove(142);
+
+    cache2.Put(149, TestEntry(1490));
+    cache2.Put(150, TestEntry(1500));
+    cache2.Put(150, TestEntry(1502));
+    cache2.Remove(150);
+
+    for (int i = 200; i < 250; ++i)
+        cache2.Put(i, TestEntry(i * 10));
+
+    lsnr.CheckNextEvent(100, boost::none, TestEntry(1000));
+    lsnr.CheckNextEvent(101, boost::none, TestEntry(1010));
+
+    lsnr.CheckNextEvent(142, boost::none, TestEntry(1420));
+    lsnr.CheckNextEvent(142, TestEntry(1420), TestEntry(1421));
+    lsnr.CheckNextEvent(142, TestEntry(1421), boost::none);
+
+    lsnr.CheckNextEvent(149, boost::none, TestEntry(1490));
+}
+
 BOOST_AUTO_TEST_SUITE_END()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core-test/src/reference_test.cpp
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core-test/src/reference_test.cpp b/modules/platforms/cpp/core-test/src/reference_test.cpp
index a5ac559..ec445c7 100644
--- a/modules/platforms/cpp/core-test/src/reference_test.cpp
+++ b/modules/platforms/cpp/core-test/src/reference_test.cpp
@@ -118,32 +118,32 @@ struct C3 : C1, C2
 
 void TestFunction1(Reference<C1> c1, int expected)
 {
-    BOOST_CHECK_EQUAL(c1.Get().c1, expected);
+    BOOST_CHECK_EQUAL(c1.Get()->c1, expected);
 }
 
 void TestFunction2(Reference<C2> c2, int expected)
 {
-    BOOST_CHECK_EQUAL(c2.Get().c2, expected);
+    BOOST_CHECK_EQUAL(c2.Get()->c2, expected);
 }
 
 void TestFunction3(Reference<C3> c3, int expected)
 {
-    BOOST_CHECK_EQUAL(c3.Get().c3, expected);
+    BOOST_CHECK_EQUAL(c3.Get()->c3, expected);
 }
 
 void TestFunctionConst1(ConstReference<C1> c1, int expected)
 {
-    BOOST_CHECK_EQUAL(c1.Get().c1, expected);
+    BOOST_CHECK_EQUAL(c1.Get()->c1, expected);
 }
 
 void TestFunctionConst2(ConstReference<C2> c2, int expected)
 {
-    BOOST_CHECK_EQUAL(c2.Get().c2, expected);
+    BOOST_CHECK_EQUAL(c2.Get()->c2, expected);
 }
 
 void TestFunctionConst3(ConstReference<C3> c3, int expected)
 {
-    BOOST_CHECK_EQUAL(c3.Get().c3, expected);
+    BOOST_CHECK_EQUAL(c3.Get()->c3, expected);
 }
 
 BOOST_AUTO_TEST_SUITE(ReferenceTestSuite)

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/Makefile.am b/modules/platforms/cpp/core/Makefile.am
index 46d6bc9..4de45d3 100644
--- a/modules/platforms/cpp/core/Makefile.am
+++ b/modules/platforms/cpp/core/Makefile.am
@@ -69,6 +69,7 @@ libignite_la_SOURCES = \
     src/impl/transactions/transactions_impl.cpp \
     src/impl/cluster/cluster_group_impl.cpp \
     src/impl/ignite_impl.cpp \
+    src/impl/ignite_binding_impl.cpp \
     src/transactions/transaction.cpp \
     src/transactions/transactions.cpp
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/Makefile.am
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/Makefile.am b/modules/platforms/cpp/core/include/Makefile.am
index 21d3062..0e9a7ec 100644
--- a/modules/platforms/cpp/core/include/Makefile.am
+++ b/modules/platforms/cpp/core/include/Makefile.am
@@ -18,45 +18,55 @@
 ACLOCAL_AMFLAGS =-I m4
 
 nobase_include_HEADERS = \
-    ignite/ignite_configuration.h \
-    ignite/ignite.h \
-    ignite/impl/binary/binary_type_updater_impl.h \
-    ignite/impl/operations.h \
-    ignite/impl/ignite_environment.h \
-    ignite/impl/ignite_impl.h \
-    ignite/impl/cache/query/query_fields_row_impl.h \
-    ignite/impl/cache/query/query_argument.h \
-    ignite/impl/cache/query/query_impl.h \
-    ignite/impl/cache/cache_impl.h \
-    ignite/impl/cache/cache_entry_processor_holder.h \
-    ignite/impl/cache/query/query_batch.h \
-    ignite/impl/interop/interop_target.h \
-    ignite/impl/interop/interop_external_memory.h \
-    ignite/impl/handle_registry.h \
-    ignite/impl/transactions/transaction_impl.h \
-    ignite/impl/transactions/transactions_impl.h \
-    ignite/impl/cluster/cluster_group_impl.h \
-    ignite/impl/ignite_binding_impl.h \
-    ignite/impl/module_manager.h \
-    ignite/cache/query/query_fields_row.h \
+    ignite/cache/cache.h \
+    ignite/cache/cache_entry.h \
+    ignite/cache/cache_entry_processor.h \
+    ignite/cache/cache_peek_mode.h \
+    ignite/cache/event/cache_entry_event.h \
+    ignite/cache/event/cache_entry_event_filter.h \
+    ignite/cache/event/cache_entry_event_listener.h \
+    ignite/cache/mutable_cache_entry.h \
+    ignite/cache/query/continuous/continuous_query.h \
+    ignite/cache/query/continuous/continuous_query_handle.h \
+    ignite/cache/query/query.h \
+    ignite/cache/query/query_cursor.h \
     ignite/cache/query/query_fields_cursor.h \
+    ignite/cache/query/query_fields_row.h \
     ignite/cache/query/query_scan.h \
-    ignite/cache/query/query_cursor.h \
     ignite/cache/query/query_sql.h \
-    ignite/cache/query/query.h \
     ignite/cache/query/query_sql_fields.h \
     ignite/cache/query/query_text.h \
-    ignite/cache/cache.h \
-    ignite/cache/cache_entry.h \
-    ignite/cache/cache_peek_mode.h \
-    ignite/cache/cache_entry_processor.h \
-    ignite/cache/mutable_cache_entry.h \
-    ignite/ignition.h \
+    ignite/ignite.h \
     ignite/ignite_binding.h \
     ignite/ignite_binding_context.h \
+    ignite/ignite_configuration.h \
+    ignite/ignition.h \
+    ignite/impl/binary/binary_type_updater_impl.h \
+    ignite/impl/bindings.h \
+    ignite/impl/cache/cache_entry_processor_holder.h \
+    ignite/impl/cache/cache_impl.h \
+    ignite/impl/cache/event/cache_entry_event_filter_base.h \
+    ignite/impl/cache/event/cache_entry_event_filter_holder.h \
+    ignite/impl/cache/query/continuous/continuous_query_handle_impl.h \
+    ignite/impl/cache/query/continuous/continuous_query_impl.h \
+    ignite/impl/cache/query/query_argument.h \
+    ignite/impl/cache/query/query_batch.h \
+    ignite/impl/cache/query/query_fields_row_impl.h \
+    ignite/impl/cache/query/query_impl.h \
+    ignite/impl/cluster/cluster_group_impl.h \
+    ignite/impl/handle_registry.h \
+    ignite/impl/ignite_binding_impl.h \
+    ignite/impl/ignite_environment.h \
+    ignite/impl/ignite_impl.h \
+    ignite/impl/interop/interop_external_memory.h \
+    ignite/impl/interop/interop_target.h \
+    ignite/impl/module_manager.h \
+    ignite/impl/operations.h \
+    ignite/impl/transactions/transactions_impl.h \
+    ignite/impl/transactions/transaction_impl.h \
     ignite/transactions/transaction.h \
-    ignite/transactions/transaction_consts.h \
     ignite/transactions/transactions.h \
+    ignite/transactions/transaction_consts.h \
     ignite/transactions/transaction_metrics.h
 
 uninstall-hook:

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/cache.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache.h b/modules/platforms/cpp/core/include/ignite/cache/cache.h
index f9c442c..00d1c81 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/cache.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/cache.h
@@ -1485,8 +1485,11 @@ namespace ignite
                 const query::continuous::ContinuousQuery<K, V>& qry, IgniteError& err)
             {
                 using namespace impl::cache::query::continuous;
+                using namespace common::concurrent;
 
-                if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener())
+                const SharedPointer<ContinuousQueryImpl<K, V> >& qryImpl = qry.impl;
+
+                if (!qryImpl.IsValid() || !qryImpl.Get()->HasListener())
                 {
                     err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
                         "Event listener is not set for ContinuousQuery instance");
@@ -1494,11 +1497,7 @@ namespace ignite
                     return query::continuous::ContinuousQueryHandle<K, V>();
                 }
 
-                ContinuousQueryHandleImpl* cqImpl;
-                cqImpl = impl.Get()->QueryContinuous(qry.impl, err);
-
-                if (cqImpl)
-                    cqImpl->SetQuery(qry.impl);
+                ContinuousQueryHandleImpl* cqImpl = impl.Get()->QueryContinuous(qryImpl, err);
 
                 return query::continuous::ContinuousQueryHandle<K, V>(cqImpl);
             }
@@ -1538,8 +1537,11 @@ namespace ignite
                 const Q& initialQry, IgniteError& err)
             {
                 using namespace impl::cache::query::continuous;
+                using namespace common::concurrent;
 
-                if (!qry.impl.IsValid() || !qry.impl.Get()->HasListener())
+                const SharedPointer<ContinuousQueryImpl<K, V> >& qryImpl = qry.impl;
+
+                if (!qryImpl.IsValid() || !qryImpl.Get()->HasListener())
                 {
                     err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
                         "Event listener is not set for ContinuousQuery instance");
@@ -1547,11 +1549,7 @@ namespace ignite
                     return query::continuous::ContinuousQueryHandle<K, V>();
                 }
 
-                ContinuousQueryHandleImpl* cqImpl;
-                cqImpl = impl.Get()->QueryContinuous(qry.impl, initialQry, err);
-
-                if (cqImpl)
-                    cqImpl->SetQuery(qry.impl);
+                ContinuousQueryHandleImpl* cqImpl = impl.Get()->QueryContinuous(qryImpl, initialQry, err);
 
                 return query::continuous::ContinuousQueryHandle<K, V>(cqImpl);
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h b/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h
index 7fa1550..e0bb694 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/cache_entry_processor.h
@@ -42,17 +42,21 @@ namespace ignite
          * All templated types should be default-constructable,
          * copy-constructable and assignable.
          *
-         * @tparam P The processor itself which inherits from CacheEntryProcessor.
          * @tparam K Key type.
          * @tparam V Value type.
          * @tparam R Process method return type.
          * @tparam A Process method argument type.
          */
-        template<typename P, typename K, typename V, typename R, typename A>
+        template<typename K, typename V, typename R, typename A>
         class CacheEntryProcessor
         {
             friend class ignite::IgniteBinding;
 
+            typedef A ArgumentType;
+            typedef K KeyType;
+            typedef V ValueType;
+            typedef R ReturnType;
+
         public:
             /**
              * Destructor.
@@ -70,40 +74,6 @@ namespace ignite
              * @return Processing result.
              */
             virtual R Process(MutableCacheEntry<K, V>& entry, const A& arg) = 0;
-
-        private:
-            /**
-             * Process input streaming data to produce output streaming data.
-             *
-             * Deserializes cache entry and processor using provided reader, invokes
-             * cache entry processor, gets result and serializes it using provided
-             * writer.
-             *
-             * @param reader Reader.
-             * @param writer Writer.
-             */
-            static void InternalProcess(impl::binary::BinaryReaderImpl& reader, impl::binary::BinaryWriterImpl& writer)
-            {
-                typedef impl::cache::CacheEntryProcessorHolder<P, A> ProcessorHolder;
-
-                ProcessorHolder procHolder = reader.ReadObject<ProcessorHolder>();
-
-                K key = reader.ReadObject<K>();
-
-                V value;
-                bool exists = reader.TryReadObject<V>(value);
-
-                impl::cache::MutableCacheEntryState entryState;
-
-                R res = procHolder.template Process<R, K, V>(key, value, exists, entryState);
-
-                writer.WriteInt8(static_cast<int8_t>(entryState));
-
-                if (entryState == impl::cache::ENTRY_STATE_VALUE_SET)
-                    writer.WriteTopObject(value);
-
-                writer.WriteTopObject(res);
-            }
         };
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h
new file mode 100644
index 0000000..3a4fc74
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/cache/event/cache_entry_event_filter.h
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * @file
+ * Declares ignite::cache::event::CacheEntryEventFilter class.
+ */
+
+#ifndef _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER
+#define _IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER
+
+#include <ignite/cache/event/cache_entry_event.h>
+#include <ignite/impl/cache/event/cache_entry_event_filter_base.h>
+
+namespace ignite
+{
+    class IgniteBinding;
+
+    namespace impl
+    {
+        namespace cache
+        {
+            namespace event
+            {
+                template<typename T>
+                class CacheEntryEventFilterHolder;
+            }
+        }
+    }
+
+    namespace cache
+    {
+        namespace event
+        {
+            /**
+             * Cache entry event filter.
+             *
+             * All templated types should be default-constructable,
+             * copy-constructable and assignable.
+             *
+             * @tparam K Key type.
+             * @tparam V Value type.
+             */
+            template<typename K, typename V>
+            class CacheEntryEventFilter : private impl::cache::event::CacheEntryEventFilterBase
+            {
+                template<typename T>
+                friend class impl::cache::event::CacheEntryEventFilterHolder;
+
+            public:
+                /**
+                 * Default constructor.
+                 */
+                CacheEntryEventFilter()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Destructor.
+                 */
+                virtual ~CacheEntryEventFilter()
+                {
+                    // No-op.
+                }
+
+                /**
+                 * Event callback.
+                 *
+                 * @param event Event.
+                 * @return True if the event passes filter.
+                 */
+                virtual bool Process(const CacheEntryEvent<K, V>& event) = 0;
+
+            private:
+                /**
+                 * Process serialized events.
+                 *
+                 * @param reader Reader for a serialized event.
+                 * @return Filter evaluation result.
+                 */
+                virtual bool ReadAndProcessEvent(binary::BinaryRawReader& reader)
+                {
+                    CacheEntryEvent<K, V> event;
+
+                    event.Read(reader);
+
+                    return Process(event);
+                }
+            };
+        }
+    }
+}
+
+#endif //_IGNITE_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
index 82bb125..0c1146b 100644
--- a/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
+++ b/modules/platforms/cpp/core/include/ignite/cache/query/continuous/continuous_query.h
@@ -24,7 +24,9 @@
 #define _IGNITE_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY
 
 #include <ignite/impl/cache/query/continuous/continuous_query_impl.h>
+
 #include <ignite/cache/event/cache_entry_event_listener.h>
+#include <ignite/cache/event/cache_entry_event_filter.h>
 
 namespace ignite
 {
@@ -83,7 +85,7 @@ namespace ignite
                      *     continuous query execution has been started.
                      */
                     ContinuousQuery(Reference<event::CacheEntryEventListener<K, V> > lsnr) :
-                        impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr))
+                        impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, false))
                     {
                         // No-op.
                     }
@@ -102,6 +104,37 @@ namespace ignite
                     }
 
                     /**
+                     * Constructor.
+                     *
+                     * @param lsnr Event listener. Invoked on the node where
+                     *     continuous query execution has been started.
+                     * @param remoteFilter Remote filter.
+                     */
+                    template<typename F>
+                    ContinuousQuery(Reference<event::CacheEntryEventListener<K, V> > lsnr,
+                        const Reference<F>& remoteFilter) :
+                        impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, false, remoteFilter))
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Constructor.
+                     *
+                     * @param lsnr Event listener Invoked on the node where
+                     *     continuous query execution has been started.
+                     * @param remoteFilter Remote filter.
+                     * @param loc Whether query should be executed locally.
+                     */
+                    template<typename F>
+                    ContinuousQuery(Reference<event::CacheEntryEventListener<K, V> > lsnr,
+                        const Reference<F>& remoteFilter, bool loc) :
+                        impl(new impl::cache::query::continuous::ContinuousQueryImpl<K, V>(lsnr, loc, remoteFilter))
+                    {
+                        // No-op.
+                    }
+
+                    /**
                      * Set local flag.
                      *
                      * @param val Value of the flag. If true, query will be

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/ignite_binding.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/ignite_binding.h b/modules/platforms/cpp/core/include/ignite/ignite_binding.h
index a8decf9..a84a1c1 100644
--- a/modules/platforms/cpp/core/include/ignite/ignite_binding.h
+++ b/modules/platforms/cpp/core/include/ignite/ignite_binding.h
@@ -22,6 +22,7 @@
 #include <ignite/common/concurrent.h>
 
 #include <ignite/impl/ignite_binding_impl.h>
+#include <ignite/impl/bindings.h>
 
 namespace ignite
 {
@@ -53,12 +54,10 @@ namespace ignite
         }
 
         /**
-         * Register Type as Cache Entry Processor.
+         * Register type as Cache Entry Processor.
          *
          * Registred type should be a child of ignite::cache::CacheEntryProcessor
          * class.
-         *
-         * This method should only be used on the valid instance.
          */
         template<typename P>
         void RegisterCacheEntryProcessor()
@@ -76,8 +75,6 @@ namespace ignite
          * Registred type should be a child of ignite::cache::CacheEntryProcessor
          * class.
          *
-         * This method should only be used on the valid instance.
-         *
          * @param err Error.
          */
         template<typename P>
@@ -87,7 +84,11 @@ namespace ignite
             impl::IgniteBindingImpl *im = impl.Get();
 
             if (im)
-                im->RegisterCallback(bt.GetTypeId(), &P::CacheEntryProcessor::InternalProcess, err);
+            {
+                im->RegisterCallback(impl::IgniteBindingImpl::CACHE_ENTRY_PROCESSOR_APPLY,
+                    bt.GetTypeId(), impl::binding::ListenerApply<P, typename P::KeyType,
+                        typename P::ValueType, typename P::ReturnType, typename P::ArgumentType>, err);
+            }
             else
             {
                 err = IgniteError(IgniteError::IGNITE_ERR_GENERIC,
@@ -96,6 +97,32 @@ namespace ignite
         }
 
         /**
+         * Register type as Cache Entry Event Filter.
+         *
+         * Registred type should be a child of ignite::cache::event::CacheEntryEventFilter
+         * class.
+         */
+        template<typename F>
+        void RegisterCacheEntryEventFilter()
+        {
+            binary::BinaryType<F> bt;
+            impl::IgniteBindingImpl *im = impl.Get();
+
+            int32_t typeId = bt.GetTypeId();
+
+            if (im)
+            {
+                im->RegisterCallback(impl::IgniteBindingImpl::CACHE_ENTRY_FILTER_CREATE,
+                    typeId, impl::binding::FilterCreate<F>);
+            }
+            else
+            {
+                throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+                    "Instance is not usable (did you check for error?).");
+            }
+        }
+
+        /**
          * Check if the instance is valid.
          *
          * Invalid instance can be returned if some of the previous operations

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h b/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h
index 1a6d26d..4d8a7a7 100644
--- a/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h
+++ b/modules/platforms/cpp/core/include/ignite/ignite_binding_context.h
@@ -70,7 +70,7 @@ namespace ignite
          * @param cfg Configuration.
          * @param binding Binding.
          */
-        IgniteBindingContext(const IgniteConfiguration& cfg, IgniteBinding binding) :
+        IgniteBindingContext(const IgniteConfiguration& cfg, const IgniteBinding& binding) :
             cfg(cfg),
             binding(binding)
         {

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/bindings.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/bindings.h b/modules/platforms/cpp/core/include/ignite/impl/bindings.h
new file mode 100644
index 0000000..ce77672
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/bindings.h
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_BINDINGS
+#define _IGNITE_IMPL_BINDINGS
+
+#include <stdint.h>
+
+#include <ignite/impl/binary/binary_reader_impl.h>
+#include <ignite/impl/ignite_environment.h>
+#include <ignite/impl/cache/query/continuous/continuous_query_impl.h>
+#include <ignite/impl/cache/cache_entry_processor_holder.h>
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace binding
+        {
+            /**
+             * Binding for filter creation.
+             * 
+             * @tparam F The filter which inherits from CacheEntryEventFilter.
+             *
+             * @param reader Reader.
+             * @param env Environment.
+             * @return Handle for the filter.
+             */
+            template<typename F>
+            int64_t FilterCreate(binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl&, IgniteEnvironment& env)
+            {
+                using namespace common::concurrent;
+                using namespace cache::query::continuous;
+
+                F filter = reader.ReadObject<F>();
+
+                SharedPointer<ContinuousQueryImplBase> qry(new RemoteFilterHolder(MakeReferenceFromCopy(filter)));
+
+                return env.GetHandleRegistry().Allocate(qry);
+            }
+
+            /**
+             * Process input streaming data to produce output streaming data.
+             *
+             * Deserializes cache entry and processor using provided reader, invokes
+             * cache entry processor, gets result and serializes it using provided
+             * writer.
+             *
+             * @param reader Reader.
+             * @param writer Writer.
+             */
+            template<typename P, typename K, typename V, typename R, typename A>
+            int64_t ListenerApply(binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer, IgniteEnvironment&)
+            {
+                typedef cache::CacheEntryProcessorHolder<P, A> ProcessorHolder;
+
+                ProcessorHolder procHolder = reader.ReadObject<ProcessorHolder>();
+
+                K key = reader.ReadObject<K>();
+
+                V value;
+                bool exists = reader.TryReadObject<V>(value);
+
+                cache::MutableCacheEntryState entryState;
+
+                R res = procHolder.template Process<R, K, V>(key, value, exists, entryState);
+
+                writer.WriteInt8(static_cast<int8_t>(entryState));
+
+                if (entryState == cache::ENTRY_STATE_VALUE_SET)
+                    writer.WriteTopObject(value);
+
+                writer.WriteTopObject(res);
+
+                return 0;
+            }
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_BINDINGS

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h
index 23b57c3..c979b4a 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_entry_processor_holder.h
@@ -75,20 +75,6 @@ namespace ignite
             /**
              * Holder for the Cache Entry Processor and its argument. Used as a convenient way to
              * transmit Cache Entry Processor between nodes.
-             *
-             * Both key and value types should be default-constructable,
-             * copy-constructable and assignable.
-             *
-             * Additionally, for the processor class public methods with the
-             * following signatures should be defined:
-             * @code{.cpp}
-             * // Should return unique ID for every class.
-             * static int64_t GetJobId();
-             *
-             * // Main processing method. Takes cache entry and argument and
-             * // returns processing result.
-             * R Process(ignite::cache::MutableCacheEntry<K, V>&, const A&);
-             * @endcode
              */
             template<typename P, typename A>
             class CacheEntryProcessorHolder
@@ -202,7 +188,6 @@ namespace ignite
             typedef impl::cache::CacheEntryProcessorHolder<P, A> UnderlyingType;
 
             IGNITE_BINARY_GET_FIELD_ID_AS_HASH
-            IGNITE_BINARY_GET_HASH_CODE_ZERO(UnderlyingType)
             IGNITE_BINARY_IS_NULL_FALSE(UnderlyingType)
             IGNITE_BINARY_GET_NULL_DEFAULT_CTOR(UnderlyingType)
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
index e6cfbab..4599522 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/cache_impl.h
@@ -22,9 +22,7 @@
 #include <ignite/cache/query/query_sql.h>
 #include <ignite/cache/query/query_text.h>
 #include <ignite/cache/query/query_sql_fields.h>
-#include <ignite/cache/query/continuous/continuous_query_handle.h>
 #include <ignite/impl/cache/query/query_impl.h>
-#include <ignite/impl/cache/query/continuous/continuous_query_handle_impl.h>
 #include <ignite/impl/cache/query/continuous/continuous_query_impl.h>
 
 #include <ignite/impl/interop/interop_target.h>
@@ -35,6 +33,15 @@ namespace ignite
     {
         namespace cache
         {
+            namespace query
+            {
+                namespace continuous
+                {
+                    /* Forward declaration. */
+                    class ContinuousQueryHandleImpl;
+                }
+            }
+
             /**
              * Cache implementation.
              */
@@ -402,30 +409,7 @@ namespace ignite
                  * @param err Error.
                  */
                 template<typename T>
-                query::QueryCursorImpl* QueryInternal(const T& qry, int32_t typ, IgniteError& err)
-                {
-                    ignite::jni::java::JniErrorInfo jniErr;
-
-                    ignite::common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory();
-                    interop::InteropMemory* mem0 = mem.Get();
-                    interop::InteropOutputStream out(mem0);
-                    binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
-                    ignite::binary::BinaryRawWriter rawWriter(&writer);
-
-                    qry.Write(rawWriter);
-
-                    out.Synchronize();
-
-                    jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpQueryCursor(GetTarget(),
-                        typ, mem.Get()->PointerLong(), &jniErr);
-
-                    IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
-
-                    if (jniErr.code == ignite::java::IGNITE_JNI_ERR_SUCCESS)
-                        return new query::QueryCursorImpl(GetEnvironmentPointer(), qryJavaRef);
-                    else
-                        return 0;
-                }
+                query::QueryCursorImpl* QueryInternal(const T& qry, int32_t typ, IgniteError& err);
 
                 /**
                  * Start continuous query execution with the initial query.
@@ -438,50 +422,7 @@ namespace ignite
                 template<typename T>
                 query::continuous::ContinuousQueryHandleImpl* QueryContinuous(
                     const common::concurrent::SharedPointer<query::continuous::ContinuousQueryImplBase> qry,
-                    const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err)
-                {
-                    jni::java::JniErrorInfo jniErr;
-
-                    common::concurrent::SharedPointer<interop::InteropMemory> mem = GetEnvironment().AllocateMemory();
-                    interop::InteropMemory* mem0 = mem.Get();
-                    interop::InteropOutputStream out(mem0);
-                    binary::BinaryWriterImpl writer(&out, GetEnvironment().GetTypeManager());
-                    ignite::binary::BinaryRawWriter rawWriter(&writer);
-
-                    const query::continuous::ContinuousQueryImplBase& qry0 = *qry.Get();
-
-                    int64_t handle = GetEnvironment().GetHandleRegistry().Allocate(qry);
-
-                    rawWriter.WriteInt64(handle);
-                    rawWriter.WriteBool(qry0.GetLocal());
-
-                    // Filters are not supported for now.
-                    rawWriter.WriteBool(false);
-                    rawWriter.WriteNull();
-
-                    rawWriter.WriteInt32(qry0.GetBufferSize());
-                    rawWriter.WriteInt64(qry0.GetTimeInterval());
-
-                    // Autounsubscribe is a filter feature.
-                    rawWriter.WriteBool(false);
-
-                    // Writing initial query. When there is not initial query writing -1.
-                    rawWriter.WriteInt32(typ);
-                    if (typ != -1)
-                        initialQry.Write(rawWriter);
-
-                    out.Synchronize();
-
-                    jobject qryJavaRef = GetEnvironment().Context()->CacheOutOpContinuousQuery(GetTarget(),
-                        cmd, mem.Get()->PointerLong(), &jniErr);
-
-                    IgniteError::SetError(jniErr.code, jniErr.errCls, jniErr.errMsg, err);
-
-                    if (jniErr.code == java::IGNITE_JNI_ERR_SUCCESS)
-                        return new query::continuous::ContinuousQueryHandleImpl(GetEnvironmentPointer(), handle, qryJavaRef);
-
-                    return 0;
-                }
+                    const T& initialQry, int32_t typ, int32_t cmd, IgniteError& err);
             };
         }
     }    

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h
new file mode 100644
index 0000000..a0e1cb6
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_base.h
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_BASE
+#define _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_BASE
+
+#include <ignite/binary/binary_raw_reader.h>
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace cache
+        {
+            namespace event
+            {
+                /**
+                 * Base for the Cache Entry Event Filter.
+                 */
+                class CacheEntryEventFilterBase
+                {
+                public:
+                    /**
+                     * Default constructor.
+                     */
+                    CacheEntryEventFilterBase()
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Destructor.
+                     */
+                    virtual ~CacheEntryEventFilterBase()
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Process serialized events.
+                     *
+                     * @param reader Reader for a serialized event.
+                     * @return Filter evaluation result.
+                     */
+                    virtual bool ReadAndProcessEvent(ignite::binary::BinaryRawReader& reader) = 0;
+                };
+            }
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_BASE

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h
new file mode 100644
index 0000000..4256f2b
--- /dev/null
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/event/cache_entry_event_filter_holder.h
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_HOLDER
+#define _IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_HOLDER
+
+#include <ignite/reference.h>
+
+namespace ignite
+{
+    namespace impl
+    {
+        namespace cache
+        {
+            namespace event
+            {
+                /* Forward declaration. */
+                class CacheEntryEventFilterBase;
+
+                class CacheEntryEventFilterHolderBase
+                {
+                public:
+                    /**
+                     * Destructor.
+                     */
+                    virtual ~CacheEntryEventFilterHolderBase()
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Write.
+                     *
+                     * @param writer Writer.
+                     */
+                    virtual void Write(binary::BinaryWriterImpl& writer) = 0;
+
+                    /**
+                     * Get filter pointer.
+                     *
+                     * @return Filter.
+                     */
+                    virtual CacheEntryEventFilterBase* GetFilter() = 0;
+                };
+
+                /**
+                 * Holder for the Cache Entry Event Filter.
+                 */
+                template<typename F>
+                class CacheEntryEventFilterHolder : public CacheEntryEventFilterHolderBase
+                {
+                public:
+                    typedef F FilterType;
+
+                    /**
+                     * Default constructor.
+                     */
+                    CacheEntryEventFilterHolder() :
+                        filter()
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Constructor.
+                     *
+                     * @param filter Filter.
+                     */
+                    CacheEntryEventFilterHolder(const Reference<FilterType>& filter) :
+                        filter(filter)
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Destructor.
+                     */
+                    virtual ~CacheEntryEventFilterHolder()
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Process input.
+                     *
+                     * @param writer Writer.
+                     */
+                    virtual void Write(binary::BinaryWriterImpl& writer)
+                    {
+                        if (!filter.IsNull())
+                        {
+                            writer.WriteBool(true);
+                            writer.WriteObject<FilterType>(*filter.Get());
+                        }
+                        else
+                        {
+                            writer.WriteBool(false);
+                            writer.WriteNull();
+                        }
+                    }
+
+                    /**
+                     * Get filter pointer.
+                     *
+                     * @return Filter.
+                     */
+                    virtual CacheEntryEventFilterBase* GetFilter()
+                    {
+                        return filter.Get();
+                    }
+
+                private:
+                    /** Stored filter. */
+                    Reference<FilterType> filter;
+                };
+
+                template<>
+                class CacheEntryEventFilterHolder<void> : public CacheEntryEventFilterHolderBase
+                {
+                public:
+                    /**
+                     * Default constructor.
+                     */
+                    CacheEntryEventFilterHolder()
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Constructor.
+                     */
+                    CacheEntryEventFilterHolder(const Reference<void>&)
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Destructor.
+                     */
+                    virtual ~CacheEntryEventFilterHolder()
+                    {
+                        // No-op.
+                    }
+
+                    /**
+                     * Process input.
+                     *
+                     * @param writer Writer.
+                     */
+                    virtual void Write(binary::BinaryWriterImpl& writer)
+                    {
+                        writer.WriteBool(false);
+                        writer.WriteNull();
+                    }
+
+                    /**
+                     * Get filter pointer.
+                     *
+                     * @return Filter.
+                     */
+                    virtual CacheEntryEventFilterBase* GetFilter()
+                    {
+                        return 0;
+                    }
+                };
+            }
+        }
+    }
+}
+
+#endif //_IGNITE_IMPL_CACHE_EVENT_CACHE_ENTRY_EVENT_FILTER_HOLDER

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
index 75504b1..07facff 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_handle_impl.h
@@ -66,13 +66,6 @@ namespace ignite
                          */
                         QueryCursorImpl* GetInitialQueryCursor(IgniteError& err);
 
-                        /**
-                         * Set query to keep pointer to.
-                         *
-                         * @param query Query.
-                         */
-                        void SetQuery(SP_ContinuousQueryImplBase query);
-
                     private:
                         /** Environment. */
                         SP_IgniteEnvironment env;
@@ -83,9 +76,6 @@ namespace ignite
                         /** Handle to Java object. */
                         jobject javaRef;
 
-                        /** Shared pointer to query. Kept for query to live long enough. */
-                        SP_ContinuousQueryImplBase qry;
-
                         /** Mutex. */
                         common::concurrent::CriticalSection mutex;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
index 2a24e5f..d2bf241 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/cache/query/continuous/continuous_query_impl.h
@@ -24,11 +24,13 @@
 #define _IGNITE_IMPL_CACHE_QUERY_CONTINUOUS_CONTINUOUS_QUERY_IMPL
 
 #include <stdint.h>
+#include <memory>
 
 #include <ignite/reference.h>
 
 #include <ignite/cache/event/cache_entry_event_listener.h>
 #include <ignite/binary/binary_raw_reader.h>
+#include <ignite/impl/cache/event/cache_entry_event_filter_holder.h>
 
 namespace ignite
 {
@@ -80,10 +82,11 @@ namespace ignite
                          *
                          * @param loc Whether query should be executed locally.
                          */
-                        explicit ContinuousQueryImplBase(bool loc) :
+                        explicit ContinuousQueryImplBase(bool loc, event::CacheEntryEventFilterHolderBase* filterOp) :
                             local(loc),
                             bufferSize(DEFAULT_BUFFER_SIZE),
-                            timeInterval(DEFAULT_TIME_INTERVAL)
+                            timeInterval(DEFAULT_TIME_INTERVAL),
+                            filterOp(filterOp)
                         {
                             // No-op.
                         }
@@ -183,6 +186,16 @@ namespace ignite
                         }
 
                         /**
+                         * Get remote filter holder.
+                         *
+                         * @return Filter holder.
+                         */
+                        event::CacheEntryEventFilterHolderBase& GetFilterHolder() const
+                        {
+                            return *filterOp;
+                        }
+
+                        /**
                          * Callback that reads and processes cache events.
                          *
                          * @param reader Reader to use.
@@ -221,6 +234,9 @@ namespace ignite
                          * sent only when buffer is full.
                          */
                         int64_t timeInterval;
+
+                        /** Cache entry event filter holder. */
+                        std::auto_ptr<event::CacheEntryEventFilterHolderBase> filterOp;
                     };
 
                     /**
@@ -252,11 +268,13 @@ namespace ignite
                         /**
                          * Constructor.
                          *
-                         * @param lsnr Event listener. Invoked on the node where
+                         * @param lsnr Event listener Invoked on the node where
                          *     continuous query execution has been started.
+                         * @param loc Whether query should be executed locally.
                          */
-                        ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr) :
-                            ContinuousQueryImplBase(false),
+                        ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr,
+                            bool loc) :
+                            ContinuousQueryImplBase(loc, new event::CacheEntryEventFilterHolder<void>()),
                             lsnr(lsnr)
                         {
                             // No-op.
@@ -269,8 +287,10 @@ namespace ignite
                          *     continuous query execution has been started.
                          * @param loc Whether query should be executed locally.
                          */
-                        ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr, bool loc) :
-                            ContinuousQueryImplBase(loc),
+                        template<typename F>
+                        ContinuousQueryImpl(Reference<ignite::cache::event::CacheEntryEventListener<K, V> >& lsnr,
+                            bool loc, const Reference<F>& filter) :
+                            ContinuousQueryImplBase(loc, new event::CacheEntryEventFilterHolder<F>(filter)),
                             lsnr(lsnr)
                         {
                             // No-op.
@@ -335,13 +355,37 @@ namespace ignite
                             for (int32_t i = 0; i < cnt; ++i)
                                 events[i].Read(reader);
 
-                            lsnr.Get().OnEvent(events.data(), cnt);
+                            lsnr.Get()->OnEvent(events.data(), cnt);
                         }
 
                     private:
                         /** Cache entry event listener. */
                         Reference<ignite::cache::event::CacheEntryEventListener<K, V> > lsnr;
                     };
+
+                    /**
+                     * Used to store filter on remote nodes where no
+                     * ContinuousQuery instance were really created.
+                     */
+                    class RemoteFilterHolder : public ContinuousQueryImplBase
+                    {
+                    public:
+                        /**
+                         * Constructor.
+                         */
+                        template<typename F>
+                        RemoteFilterHolder(const Reference<F>& filter):
+                            ContinuousQueryImplBase(false, new event::CacheEntryEventFilterHolder<F>(filter))
+                        {
+                            // No-op.
+                        }
+
+                        virtual void ReadAndProcessEvents(ignite::binary::BinaryRawReader&)
+                        {
+                            throw IgniteError(IgniteError::IGNITE_ERR_GENERIC,
+                                "No listener is registered for the ContinuousQuery instance");
+                        }
+                    };
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
index 32de2cb..7b20c50 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_binding_impl.h
@@ -18,6 +18,7 @@
 #ifndef _IGNITE_IMPL_IGNITE_BINDING_IMPL
 #define _IGNITE_IMPL_IGNITE_BINDING_IMPL
 
+#include <stdint.h>
 #include <map>
 
 #include <ignite/common/common.h>
@@ -29,6 +30,9 @@ namespace ignite
 {
     namespace impl
     {
+        /* Forward declaration. */
+        class IgniteEnvironment;
+
         /**
          * Ignite binding implementation.
          *
@@ -36,16 +40,24 @@ namespace ignite
          */
         class IgniteBindingImpl
         {
-            typedef void (Callback)(binary::BinaryReaderImpl&, binary::BinaryWriterImpl&);
+            typedef int64_t(Callback)(binary::BinaryReaderImpl&, binary::BinaryWriterImpl&, IgniteEnvironment&);
 
         public:
+            enum CallbackType
+            {
+                CACHE_ENTRY_PROCESSOR_APPLY = 1,
+
+                CACHE_ENTRY_FILTER_CREATE = 2,
+
+                CACHE_ENTRY_FILTER_APPLY = 3,
+            };
+
             /**
-             * Default constructor.
+             * Constructor.
+             *
+             * @param env Environment.
              */
-            IgniteBindingImpl() : callbacks()
-            {
-                // No-op.
-            }
+            IgniteBindingImpl(IgniteEnvironment &env);
 
             /**
              * Invoke callback using provided ID.
@@ -53,31 +65,15 @@ namespace ignite
              * Deserializes data and callback itself, invokes callback and
              * serializes processing result using providede reader and writer.
              *
-             * @param id Processor ID.
+             * @param type Callback Type.
+             * @param id Callback ID.
              * @param reader Reader.
              * @param writer Writer.
-             * @return True if callback is registered and false otherwise.
+             * @param found Output param. True if callback was found and false otherwise.
+             * @return Callback return value.
              */
-            bool InvokeCallbackById(int64_t id, binary::BinaryReaderImpl& reader, binary::BinaryWriterImpl& writer)
-            {
-                common::concurrent::CsLockGuard guard(lock);
-
-                std::map<int64_t, Callback*>::iterator it = callbacks.find(id);
-
-                if (it != callbacks.end())
-                {
-                    Callback* callback = it->second;
-
-                    // We have found callback and does not need lock here anymore.
-                    guard.Reset();
-
-                    callback(reader, writer);
-
-                    return true;
-                }
-
-                return false;
-            }
+            IGNITE_IMPORT_EXPORT int64_t InvokeCallback(bool& found, int32_t type, int32_t id, binary::BinaryReaderImpl& reader,
+                binary::BinaryWriterImpl& writer);
 
             /**
              * Register cache entry processor and associate it with provided ID.
@@ -85,29 +81,42 @@ namespace ignite
              * @throw IgniteError another processor is already associated with
              *     the given ID.
              *
-             * @param id Identifier for processor to be associated with.
-             * @param proc Callback.
+             * @param type Callback type.
+             * @param id Callback identifier.
+             * @param callback Callback.
+             * @param err Error.
              */
-            void RegisterCallback(int64_t id, Callback* proc, IgniteError& err)
-            {
-                common::concurrent::CsLockGuard guard(lock);
-
-                bool inserted = callbacks.insert(std::make_pair(id, proc)).second;
-
-                guard.Reset();
-
-                if (!inserted)
-                {
-                    std::stringstream builder;
+            IGNITE_IMPORT_EXPORT void RegisterCallback(int32_t type, int32_t id, Callback* callback, IgniteError& err);
+            
+            /**
+             * Register cache entry processor and associate it with provided ID.
+             *
+             * @throw IgniteError another processor is already associated with
+             *     the given ID.
+             *
+             * @param type Callback type.
+             * @param id Callback identifier.
+             * @param callback Callback.
+             */
+            IGNITE_IMPORT_EXPORT void RegisterCallback(int32_t type, int32_t id, Callback* callback);
 
-                    builder << "Trying to register multiple PRC callbacks with the same ID. [id=" << id << ']';
+        private:
+            IGNITE_NO_COPY_ASSIGNMENT(IgniteBindingImpl);
 
-                    err = IgniteError(IgniteError::IGNITE_ERR_ENTRY_PROCESSOR, builder.str().c_str());
-                }
+            /**
+             * Make key out of callback's type and ID.
+             *
+             * @param type Callback Type.
+             * @param id Callback ID.
+             * @return Key for callback.
+             */
+            int64_t makeKey(int32_t type, int32_t id)
+            {
+                return (static_cast<int64_t>(type) << 32) | id;
             }
 
-        private:
-            IGNITE_NO_COPY_ASSIGNMENT(IgniteBindingImpl);
+            /** Ignite environment. */
+            IgniteEnvironment& env;
 
             /** Registered callbacks. */
             std::map<int64_t, Callback*> callbacks;

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
index 5fc9a27..e3cb859 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_environment.h
@@ -21,19 +21,20 @@
 #include <ignite/common/concurrent.h>
 #include <ignite/jni/java.h>
 #include <ignite/jni/utils.h>
-#include <ignite/ignite_binding_context.h>
 #include <ignite/ignite_configuration.h>
 
-#include "ignite/impl/interop/interop_memory.h"
-#include "ignite/impl/binary/binary_type_manager.h"
-#include "ignite/impl/handle_registry.h"
-#include "ignite/impl/module_manager.h"
-#include "ignite/impl/ignite_binding_impl.h"
+#include <ignite/impl/interop/interop_memory.h>
+#include <ignite/impl/binary/binary_type_manager.h>
+#include <ignite/impl/handle_registry.h>
 
 namespace ignite
 {
     namespace impl
     {
+        /* Forward declarations. */
+        class IgniteBindingImpl;
+        class ModuleManager;
+
         /**
          * Defines environment in which Ignite operates.
          */
@@ -110,6 +111,21 @@ namespace ignite
             void OnContinuousQueryListenerApply(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
 
             /**
+             * Continuous query filter create callback.
+             *
+             * @param mem Memory with data.
+             * @return Filter handle.
+             */
+            int64_t OnContinuousQueryFilterCreate(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
+            /**
+             * Continuous query filter apply callback.
+             *
+             * @param mem Memory with data.
+             */
+            int64_t OnContinuousQueryFilterApply(common::concurrent::SharedPointer<interop::InteropMemory>& mem);
+
+            /**
              * Cache Invoke callback.
              *
              * @param mem Input-output memory.
@@ -191,14 +207,7 @@ namespace ignite
              *
              * @return IgniteBinding instance.
              */
-            IgniteBinding GetBinding() const;
-
-            /**
-             * Get binding context.
-             *
-             * @return Binding context.
-             */
-            IgniteBindingContext GetBindingContext() const;
+            common::concurrent::SharedPointer<IgniteBindingImpl> GetBinding() const;
 
         private:
             /** Node configuration. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
index 24fc989..5b1f527 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/ignite_impl.h
@@ -22,10 +22,10 @@
 #include <ignite/jni/java.h>
 #include <ignite/common/utils.h>
 
-#include "ignite/impl/cache/cache_impl.h"
-#include "ignite/impl/transactions/transactions_impl.h"
-#include "ignite/impl/cluster/cluster_group_impl.h"
-#include "ignite/impl/ignite_environment.h"
+#include <ignite/impl/cache/cache_impl.h>
+#include <ignite/impl/transactions/transactions_impl.h>
+#include <ignite/impl/cluster/cluster_group_impl.h>
+#include <ignite/impl/ignite_environment.h>
 
 namespace ignite 
 {
@@ -154,7 +154,7 @@ namespace ignite
              *
              * @return IgniteBinding class instance.
              */
-            IgniteBinding GetBinding();
+            common::concurrent::SharedPointer<IgniteBindingImpl> GetBinding();
 
             /**
              * Get instance of the implementation from the proxy class.

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/include/ignite/impl/operations.h
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/include/ignite/impl/operations.h b/modules/platforms/cpp/core/include/ignite/impl/operations.h
index dfaa4e8..fff8a86 100644
--- a/modules/platforms/cpp/core/include/ignite/impl/operations.h
+++ b/modules/platforms/cpp/core/include/ignite/impl/operations.h
@@ -79,7 +79,7 @@ namespace ignite
             }
         private:
             /** Value. */
-            const T val;
+            const T& val;
 
             IGNITE_NO_COPY_ASSIGNMENT(In1Operation)
         };

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/project/vs/core.vcxproj
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj b/modules/platforms/cpp/core/project/vs/core.vcxproj
index b490887..b5a95bd 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj
@@ -195,6 +195,7 @@
     <ClInclude Include="..\..\include\ignite\cache\cache_entry_processor.h" />
     <ClInclude Include="..\..\include\ignite\cache\cache_peek_mode.h" />
     <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event.h" />
+    <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_filter.h" />
     <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_listener.h" />
     <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query.h" />
     <ClInclude Include="..\..\include\ignite\cache\query\continuous\continuous_query_handle.h" />
@@ -212,9 +213,12 @@
     <ClInclude Include="..\..\include\ignite\ignite_configuration.h" />
     <ClInclude Include="..\..\include\ignite\ignition.h" />
     <ClInclude Include="..\..\include\ignite\impl\binary\binary_type_updater_impl.h" />
+    <ClInclude Include="..\..\include\ignite\impl\bindings.h" />
     <ClInclude Include="..\..\include\ignite\impl\cache\cache_entry_processor_holder.h" />
     <ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h" />
     <ClInclude Include="..\..\include\ignite\impl\cache\query\query_argument.h" />
+    <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_base.h" />
+    <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_holder.h" />
     <ClInclude Include="..\..\include\ignite\impl\cache\query\query_batch.h" />
     <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_handle_impl.h" />
     <ClInclude Include="..\..\include\ignite\impl\cache\query\continuous\continuous_query_impl.h" />
@@ -246,6 +250,7 @@
     <ClCompile Include="..\..\src\impl\cache\query\continuous\continuous_query_handle_impl.cpp" />
     <ClCompile Include="..\..\src\impl\cache\query\query_impl.cpp" />
     <ClCompile Include="..\..\src\impl\cluster\cluster_group_impl.cpp" />
+    <ClCompile Include="..\..\src\impl\ignite_binding_impl.cpp" />
     <ClCompile Include="..\..\src\impl\ignite_environment.cpp" />
     <ClCompile Include="..\..\src\impl\ignite_impl.cpp" />
     <ClCompile Include="..\..\src\impl\handle_registry.cpp" />

http://git-wip-us.apache.org/repos/asf/ignite/blob/d4da92b7/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
----------------------------------------------------------------------
diff --git a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
index b75b3b2..3b17d53 100644
--- a/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
+++ b/modules/platforms/cpp/core/project/vs/core.vcxproj.filters
@@ -52,6 +52,9 @@
     <ClCompile Include="..\..\src\impl\cluster\cluster_group_impl.cpp">
       <Filter>Code\impl\cluster</Filter>
     </ClCompile>
+    <ClCompile Include="..\..\src\impl\ignite_binding_impl.cpp">
+      <Filter>Code\impl</Filter>
+    </ClCompile>
   </ItemGroup>
   <ItemGroup>
     <ClInclude Include="..\..\include\ignite\impl\cache\cache_impl.h">
@@ -192,6 +195,18 @@
     <ClInclude Include="..\..\include\ignite\impl\cache\query\query_argument.h">
       <Filter>Code\impl\cache\query</Filter>
     </ClInclude>
+    <ClInclude Include="..\..\include\ignite\cache\event\cache_entry_event_filter.h">
+      <Filter>Code\cache\event</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_base.h">
+      <Filter>Code\impl\cache\event</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\impl\cache\event\cache_entry_event_filter_holder.h">
+      <Filter>Code\impl\cache\event</Filter>
+    </ClInclude>
+    <ClInclude Include="..\..\include\ignite\impl\bindings.h">
+      <Filter>Code\impl</Filter>
+    </ClInclude>
   </ItemGroup>
   <ItemGroup>
     <Filter Include="Code">
@@ -236,5 +251,8 @@
     <Filter Include="Code\impl\cluster">
       <UniqueIdentifier>{f5b54635-91a1-447e-923a-1b4608d7e5bc}</UniqueIdentifier>
     </Filter>
+    <Filter Include="Code\impl\cache\event">
+      <UniqueIdentifier>{9c5e9732-755a-4553-8926-b4cf3b6abaf3}</UniqueIdentifier>
+    </Filter>
   </ItemGroup>
 </Project>
\ No newline at end of file


Mime
View raw message