Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A8FB5196E7 for ; Mon, 21 Mar 2016 14:20:27 +0000 (UTC) Received: (qmail 18456 invoked by uid 500); 21 Mar 2016 14:20:27 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 18401 invoked by uid 500); 21 Mar 2016 14:20:27 -0000 Mailing-List: contact commits-help@ignite.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.apache.org Delivered-To: mailing list commits@ignite.apache.org Received: (qmail 18290 invoked by uid 99); 21 Mar 2016 14:20:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 21 Mar 2016 14:20:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7E3DBDFA43; Mon, 21 Mar 2016 14:20:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: av@apache.org To: commits@ignite.apache.org Date: Mon, 21 Mar 2016 14:20:52 -0000 Message-Id: <6e4b68434276497abe59492e9a63df0d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [27/50] [abbrv] ignite git commit: IGNITE-2756 - Fixed StreamVisitorExample IGNITE-2756 - Fixed StreamVisitorExample Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/31df63b5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/31df63b5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/31df63b5 Branch: refs/heads/ignite-2801 Commit: 31df63b5d59bbfc43799187908cf8a47262b3f2b Parents: 52c134d Author: Valentin Kulichenko Authored: Wed Mar 9 18:53:30 2016 -0800 Committer: Valentin Kulichenko Committed: Wed Mar 9 22:52:22 2016 -0800 ---------------------------------------------------------------------- .../streaming/StreamVisitorExample.java | 38 ++++++++++++++------ 1 file changed, 28 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/31df63b5/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java index 819cfea..c3d8c64 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java @@ -35,12 +35,16 @@ import org.apache.ignite.examples.ExamplesUtils; import org.apache.ignite.stream.StreamVisitor; /** - * Stream random numbers into the streaming cache. - * To start the example, you should: - *
    - *
  • Start a few nodes using {@link ExampleNodeStartup}.
  • - *
  • Start streaming using {@link StreamVisitorExample}.
  • - *
+ * This examples demonstrates the stream visitor which allows to customize the processing + * of the streamed data on the server side. Instead of populating the cache for which the + * streamer is created, we will calculate aggregated data on the fly and save results in + * another cache. + *

+ * Remote nodes should always be started with special configuration file which + * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}. + *

+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-ignite.xml} configuration. */ public class StreamVisitorExample { /** Random number generator. */ @@ -52,6 +56,10 @@ public class StreamVisitorExample { /** The list of initial instrument prices. */ private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; + /** Caches' names. */ + private static final String INSTRUMENTS_CACHE_NAME = "instCache"; + private static final String MARKET_TICKS_CACHE_NAME = "marketTicks"; + public static void main(String[] args) throws Exception { // Mark this cluster member as client. Ignition.setClientMode(true); @@ -61,7 +69,7 @@ public class StreamVisitorExample { return; // Financial instrument cache configuration. - CacheConfiguration instCfg = new CacheConfiguration<>("instCache"); + CacheConfiguration instCfg = new CacheConfiguration<>(INSTRUMENTS_CACHE_NAME); // Index key and value for querying financial instruments. // Note that Instrument class has @QuerySqlField annotation for secondary field indexing. @@ -69,10 +77,14 @@ public class StreamVisitorExample { // Auto-close caches at the end of the example. try ( - IgniteCache mktCache = ignite.getOrCreateCache("marketTicks"); // Default config. + IgniteCache mktCache = ignite.getOrCreateCache(MARKET_TICKS_CACHE_NAME); // Default config. IgniteCache instCache = ignite.getOrCreateCache(instCfg) ) { try (IgniteDataStreamer mktStmr = ignite.dataStreamer(mktCache.getName())) { + // To achieve proper indexing we should use fully-qualified name + // of the class as a type name when binary object is created. + final String instTypeName = Instrument.class.getName(); + // Note that we receive market data, but do not populate 'mktCache' (it remains empty). // Instead we update the instruments in the 'instCache'. // Since both, 'instCache' and 'mktCache' use the same key, updates are collocated. @@ -81,14 +93,15 @@ public class StreamVisitorExample { String symbol = e.getKey(); Double tick = e.getValue(); - IgniteCache binInstCache = ignite.cache("instCache").withKeepBinary(); + IgniteCache binInstCache = + ignite.cache(INSTRUMENTS_CACHE_NAME).withKeepBinary(); BinaryObject inst = binInstCache.get(symbol); BinaryObjectBuilder instBuilder; if (inst == null) { - instBuilder = ignite.binary().builder("Instrument"); + instBuilder = ignite.binary().builder(instTypeName); // Constructor logic. instBuilder.setField( @@ -140,6 +153,11 @@ public class StreamVisitorExample { // Print top 10 words. ExamplesUtils.printQueryResults(top3); } + finally { + // Distributed cache could be removed from cluster only by #destroyCache() call. + ignite.destroyCache(INSTRUMENTS_CACHE_NAME); + ignite.destroyCache(MARKET_TICKS_CACHE_NAME); + } } }