ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a.@apache.org
Subject [27/50] [abbrv] ignite git commit: IGNITE-2756 - Fixed StreamVisitorExample
Date Mon, 21 Mar 2016 14:20:52 GMT
IGNITE-2756 - Fixed StreamVisitorExample


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/31df63b5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/31df63b5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/31df63b5

Branch: refs/heads/ignite-2801
Commit: 31df63b5d59bbfc43799187908cf8a47262b3f2b
Parents: 52c134d
Author: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Authored: Wed Mar 9 18:53:30 2016 -0800
Committer: Valentin Kulichenko <valentin.kulichenko@gmail.com>
Committed: Wed Mar 9 22:52:22 2016 -0800

----------------------------------------------------------------------
 .../streaming/StreamVisitorExample.java         | 38 ++++++++++++++------
 1 file changed, 28 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/31df63b5/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
index 819cfea..c3d8c64 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
@@ -35,12 +35,16 @@ import org.apache.ignite.examples.ExamplesUtils;
 import org.apache.ignite.stream.StreamVisitor;
 
 /**
- * Stream random numbers into the streaming cache.
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
- *     <li>Start streaming using {@link StreamVisitorExample}.</li>
- * </ul>
+ * This examples demonstrates the stream visitor which allows to customize the processing
+ * of the streamed data on the server side. Instead of populating the cache for which the
+ * streamer is created, we will calculate aggregated data on the fly and save results in
+ * another cache.
+ * <p>
+ * Remote nodes should always be started with special configuration file which
+ * enables P2P class loading: {@code 'ignite.{sh|bat} examples/config/example-ignite.xml'}.
+ * <p>
+ * Alternatively you can run {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
  */
 public class StreamVisitorExample {
     /** Random number generator. */
@@ -52,6 +56,10 @@ public class StreamVisitorExample {
     /** The list of initial instrument prices. */
     private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03,
44.41, 28.44, 378.49, 69.50};
 
+    /** Caches' names. */
+    private static final String INSTRUMENTS_CACHE_NAME = "instCache";
+    private static final String MARKET_TICKS_CACHE_NAME = "marketTicks";
+
     public static void main(String[] args) throws Exception {
         // Mark this cluster member as client.
         Ignition.setClientMode(true);
@@ -61,7 +69,7 @@ public class StreamVisitorExample {
                 return;
 
             // Financial instrument cache configuration.
-            CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache");
+            CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>(INSTRUMENTS_CACHE_NAME);
 
             // Index key and value for querying financial instruments.
             // Note that Instrument class has @QuerySqlField annotation for secondary field
indexing.
@@ -69,10 +77,14 @@ public class StreamVisitorExample {
 
             // Auto-close caches at the end of the example.
             try (
-                IgniteCache<String, Double> mktCache = ignite.getOrCreateCache("marketTicks");
// Default config.
+                IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(MARKET_TICKS_CACHE_NAME);
// Default config.
                 IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg)
             ) {
                 try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName()))
{
+                    // To achieve proper indexing we should use fully-qualified name
+                    // of the class as a type name when binary object is created.
+                    final String instTypeName = Instrument.class.getName();
+
                     // Note that we receive market data, but do not populate 'mktCache' (it
remains empty).
                     // Instead we update the instruments in the 'instCache'.
                     // Since both, 'instCache' and 'mktCache' use the same key, updates are
collocated.
@@ -81,14 +93,15 @@ public class StreamVisitorExample {
                             String symbol = e.getKey();
                             Double tick = e.getValue();
 
-                            IgniteCache<String, BinaryObject> binInstCache = ignite.cache("instCache").withKeepBinary();
+                            IgniteCache<String, BinaryObject> binInstCache =
+                                ignite.cache(INSTRUMENTS_CACHE_NAME).withKeepBinary();
 
                             BinaryObject inst = binInstCache.get(symbol);
 
                             BinaryObjectBuilder instBuilder;
 
                             if (inst == null) {
-                                instBuilder = ignite.binary().builder("Instrument");
+                                instBuilder = ignite.binary().builder(instTypeName);
 
                                 // Constructor logic.
                                 instBuilder.setField(
@@ -140,6 +153,11 @@ public class StreamVisitorExample {
                 // Print top 10 words.
                 ExamplesUtils.printQueryResults(top3);
             }
+            finally {
+                // Distributed cache could be removed from cluster only by #destroyCache()
call.
+                ignite.destroyCache(INSTRUMENTS_CACHE_NAME);
+                ignite.destroyCache(MARKET_TICKS_CACHE_NAME);
+            }
         }
     }
 


Mime
View raw message