ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shro...@apache.org
Subject ignite git commit: Fix for IGNITE-2730: Remote filter does not know cache name.
Date Fri, 01 Apr 2016 01:11:28 GMT
Repository: ignite
Updated Branches:
  refs/heads/master bd509bec4 -> 04eb6c966


Fix for IGNITE-2730: Remote filter does not know cache name.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04eb6c96
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04eb6c96
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04eb6c96

Branch: refs/heads/master
Commit: 04eb6c966c6f8a6e59d99e52e5967610d10ad9f7
Parents: bd509be
Author: shtykh_roman <rshtykh@yahoo.com>
Authored: Fri Apr 1 10:10:48 2016 +0900
Committer: shtykh_roman <rshtykh@yahoo.com>
Committed: Fri Apr 1 10:10:48 2016 +0900

----------------------------------------------------------------------
 modules/kafka/README.txt                             | 10 +++++-----
 .../stream/kafka/connect/IgniteSourceTask.java       | 15 +++++++++++----
 2 files changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/04eb6c96/modules/kafka/README.txt
----------------------------------------------------------------------
diff --git a/modules/kafka/README.txt b/modules/kafka/README.txt
index 3a1a5aa..259a4f6 100644
--- a/modules/kafka/README.txt
+++ b/modules/kafka/README.txt
@@ -77,7 +77,7 @@ offset.flush.interval.ms=10000
 ```
 # connector
 name=string-ignite-connector
-connector.class=IgniteSinkConnector
+connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector
 tasks.max=2
 topics=testTopic1,testTopic2
 
@@ -114,7 +114,7 @@ bin/kafka-server-start.sh config/server.properties
 
 3. Provide some data input to the Kafka server
 ```
-bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true
--operty key.separator=,
+bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true
--property key.separator=,
 k1,v1
 ```
 
@@ -141,7 +141,7 @@ as described in the following subsection.
 - ignite-kafka-connect-x.x.x-SNAPSHOT.jar
 - ignite-core-x.x.x-SNAPSHOT.jar
 - cache-api-1.0.0.jar
-- ignite-spring-1.5.0-SNAPSHOT.jar
+- ignite-spring-x.x.x-SNAPSHOT.jar
 - spring-aop-4.1.0.RELEASE.jar
 - spring-beans-4.1.0.RELEASE.jar
 - spring-context-4.1.0.RELEASE.jar
@@ -174,12 +174,12 @@ using org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter.
 ```
 # connector
 name=ignite-src-connector
-connector.class=IgniteSourceConnector
+connector.class=org.apache.ignite.stream.kafka.connect.IgniteSourceConnector
 tasks.max=2
 
 # cache
 topicNames=testTopic1,testTopic2
-cacheEvts=put,remove
+cacheEvts=put,removed
 ## if you decide to filter remotely (recommended)
 cacheFilterCls=MyFilter
 cacheName=cache1

http://git-wip-us.apache.org/repos/asf/ignite/blob/04eb6c96/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
----------------------------------------------------------------------
diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
index 9eb183c..0d312ca 100644
--- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
+++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java
@@ -29,7 +29,6 @@ import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.events.EventType;
 import org.apache.ignite.lang.IgniteBiPredicate;
@@ -76,7 +75,7 @@ public class IgniteSourceTask extends SourceTask {
     private static TaskLocalListener locLsnr = new TaskLocalListener();
 
     /** Remote filter. */
-    private static TaskRemoteFilter rmtLsnr = new TaskRemoteFilter();
+    private static TaskRemoteFilter rmtLsnr;
 
     /** User-defined filter. */
     private static IgnitePredicate<CacheEvent> filter;
@@ -131,6 +130,8 @@ public class IgniteSourceTask extends SourceTask {
             }
         }
 
+        rmtLsnr = new TaskRemoteFilter(cacheName);
+
         try {
             int[] evts = cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS));
 
@@ -251,12 +252,18 @@ public class IgniteSourceTask extends SourceTask {
         @IgniteInstanceResource
         Ignite ignite;
 
+        /** Cache name. */
+        private final String cacheName;
+
+        TaskRemoteFilter(String cacheName) {
+            this.cacheName = cacheName;
+        }
+
         @Override public boolean apply(CacheEvent evt) {
 
             Affinity affinity = ignite.affinity(cacheName);
-            ClusterNode evtNode = evt.eventNode();
 
-            if (affinity.isPrimary(evtNode, evt.key())) {
+            if (affinity.isPrimary(ignite.cluster().localNode(), evt.key())) {
                 // Process this event. Ignored on backups.
                 if (filter != null && filter.apply(evt))
                     return false;


Mime
View raw message