Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5994418B2C for ; Fri, 1 Apr 2016 01:11:28 +0000 (UTC) Received: (qmail 59675 invoked by uid 500); 1 Apr 2016 01:11:28 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 59639 invoked by uid 500); 1 Apr 2016 01:11:28 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 59630 invoked by uid 99); 1 Apr 2016 01:11:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Apr 2016 01:11:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1A518DFC73; Fri, 1 Apr 2016 01:11:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shroman@apache.org To: commits@ignite.apache.org Message-Id: <90990184e6f24c04b680720c23adf92a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: ignite git commit: Fix for IGNITE-2730: Remote filter does not know cache name. Date: Fri, 1 Apr 2016 01:11:28 +0000 (UTC) Repository: ignite Updated Branches: refs/heads/master bd509bec4 -> 04eb6c966 Fix for IGNITE-2730: Remote filter does not know cache name. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/04eb6c96 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/04eb6c96 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/04eb6c96 Branch: refs/heads/master Commit: 04eb6c966c6f8a6e59d99e52e5967610d10ad9f7 Parents: bd509be Author: shtykh_roman Authored: Fri Apr 1 10:10:48 2016 +0900 Committer: shtykh_roman Committed: Fri Apr 1 10:10:48 2016 +0900 ---------------------------------------------------------------------- modules/kafka/README.txt | 10 +++++----- .../stream/kafka/connect/IgniteSourceTask.java | 15 +++++++++++---- 2 files changed, 16 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/04eb6c96/modules/kafka/README.txt ---------------------------------------------------------------------- diff --git a/modules/kafka/README.txt b/modules/kafka/README.txt index 3a1a5aa..259a4f6 100644 --- a/modules/kafka/README.txt +++ b/modules/kafka/README.txt @@ -77,7 +77,7 @@ offset.flush.interval.ms=10000 ``` # connector name=string-ignite-connector -connector.class=IgniteSinkConnector +connector.class=org.apache.ignite.stream.kafka.connect.IgniteSinkConnector tasks.max=2 topics=testTopic1,testTopic2 @@ -114,7 +114,7 @@ bin/kafka-server-start.sh config/server.properties 3. Provide some data input to the Kafka server ``` -bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --operty key.separator=, +bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test --property parse.key=true --property key.separator=, k1,v1 ``` @@ -141,7 +141,7 @@ as described in the following subsection. - ignite-kafka-connect-x.x.x-SNAPSHOT.jar - ignite-core-x.x.x-SNAPSHOT.jar - cache-api-1.0.0.jar -- ignite-spring-1.5.0-SNAPSHOT.jar +- ignite-spring-x.x.x-SNAPSHOT.jar - spring-aop-4.1.0.RELEASE.jar - spring-beans-4.1.0.RELEASE.jar - spring-context-4.1.0.RELEASE.jar @@ -174,12 +174,12 @@ using org.apache.ignite.stream.kafka.connect.serialization.CacheEventConverter. ``` # connector name=ignite-src-connector -connector.class=IgniteSourceConnector +connector.class=org.apache.ignite.stream.kafka.connect.IgniteSourceConnector tasks.max=2 # cache topicNames=testTopic1,testTopic2 -cacheEvts=put,remove +cacheEvts=put,removed ## if you decide to filter remotely (recommended) cacheFilterCls=MyFilter cacheName=cache1 http://git-wip-us.apache.org/repos/asf/ignite/blob/04eb6c96/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java ---------------------------------------------------------------------- diff --git a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java index 9eb183c..0d312ca 100644 --- a/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java +++ b/modules/kafka/src/main/java/org/apache/ignite/stream/kafka/connect/IgniteSourceTask.java @@ -29,7 +29,6 @@ import org.apache.ignite.Ignite; import org.apache.ignite.IgniteException; import org.apache.ignite.Ignition; import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.CacheEvent; import org.apache.ignite.events.EventType; import org.apache.ignite.lang.IgniteBiPredicate; @@ -76,7 +75,7 @@ public class IgniteSourceTask extends SourceTask { private static TaskLocalListener locLsnr = new TaskLocalListener(); /** Remote filter. */ - private static TaskRemoteFilter rmtLsnr = new TaskRemoteFilter(); + private static TaskRemoteFilter rmtLsnr; /** User-defined filter. */ private static IgnitePredicate filter; @@ -131,6 +130,8 @@ public class IgniteSourceTask extends SourceTask { } } + rmtLsnr = new TaskRemoteFilter(cacheName); + try { int[] evts = cacheEvents(props.get(IgniteSourceConstants.CACHE_EVENTS)); @@ -251,12 +252,18 @@ public class IgniteSourceTask extends SourceTask { @IgniteInstanceResource Ignite ignite; + /** Cache name. */ + private final String cacheName; + + TaskRemoteFilter(String cacheName) { + this.cacheName = cacheName; + } + @Override public boolean apply(CacheEvent evt) { Affinity affinity = ignite.affinity(cacheName); - ClusterNode evtNode = evt.eventNode(); - if (affinity.isPrimary(evtNode, evt.key())) { + if (affinity.isPrimary(ignite.cluster().localNode(), evt.key())) { // Process this event. Ignored on backups. if (filter != null && filter.apply(evt)) return false;