ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject incubator-ignite git commit: # IGNITE-45 - Examples
Date Sat, 21 Mar 2015 20:41:26 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-45 9dde6c147 -> 629560086


# IGNITE-45 - Examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/62956008
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/62956008
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/62956008

Branch: refs/heads/ignite-45
Commit: 629560086e9face071aeb3dc4f945dc779de8041
Parents: 9dde6c1
Author: Valentin Kulichenko <vkulichenko@gridgain.com>
Authored: Sat Mar 21 13:43:00 2015 -0700
Committer: Valentin Kulichenko <vkulichenko@gridgain.com>
Committed: Sat Mar 21 13:43:00 2015 -0700

----------------------------------------------------------------------
 .../streaming/marketdata/StreamMarketData.java  |  8 ++---
 .../streaming/marketdata/StreamMarketData.java  |  2 +-
 .../org/apache/ignite/stream/StreamVisitor.java | 36 ++++++++++++--------
 3 files changed, 26 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62956008/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
index 513a2e1..6d46f04 100644
--- a/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
+++ b/examples/src/main/java/org/apache/ignite/examples/java7/streaming/marketdata/StreamMarketData.java
@@ -18,9 +18,7 @@
 package org.apache.ignite.examples.java7.streaming.marketdata;
 
 import org.apache.ignite.*;
-import org.apache.ignite.examples.streaming.numbers.ExamplesUtils;
 import org.apache.ignite.examples.java7.*;
-import org.apache.ignite.lang.*;
 import org.apache.ignite.stream.*;
 
 import java.util.*;
@@ -64,9 +62,9 @@ public class StreamMarketData {
             try (IgniteDataStreamer<String, MarketTick> mktStmr = ignite.dataStreamer(mktCache.getName()))
{
                 // Note that we receive market data, but do not populate 'mktCache' (it remains
empty).
                 // Instead we update the instruments in the 'instCache'.
-                mktStmr.receiver(new StreamVisitor<>(new IgniteBiInClosure<IgniteCache<String,
MarketTick>, Map.Entry<String, MarketTick>>() {
+                mktStmr.receiver(new StreamVisitor<String, MarketTick>() {
                     @Override
-                    public void apply(IgniteCache<String, MarketTick> mktCache, Map.Entry<String,
MarketTick> e) {
+                    public void visit(IgniteCache<String, MarketTick> mktCache, Map.Entry<String,
MarketTick> e) {
                         String symbol = e.getKey();
                         MarketTick tick = e.getValue();
 
@@ -81,7 +79,7 @@ public class StreamMarketData {
 
                         instCache.put(symbol, inst);
                     }
-                }));
+                });
 
                 // Stream market data into market data stream cache.
                 while (true) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62956008/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
index ccec740..ed2a25a 100644
--- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
+++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
@@ -62,7 +62,7 @@ public class StreamMarketData {
             try (IgniteDataStreamer<String, MarketTick> mktStmr = ignite.dataStreamer(mktCache.getName()))
{
                 // Note that we receive market data, but do not populate 'mktCache' (it remains
empty).
                 // Instead we update the instruments in the 'instCache'.
-                mktStmr.receiver(new StreamVisitor<>((cache, e) -> {
+                mktStmr.receiver(StreamVisitor.from((cache, e) -> {
                     String symbol = e.getKey();
                     MarketTick tick = e.getValue();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/62956008/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
index 0474278..105607a 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamVisitor.java
@@ -27,27 +27,35 @@ import java.util.*;
  * does not update the cache. If the tuple needs to be stored in the cache,
  * then {@code cache.put(...)} should be called explicitely.
  */
-public class StreamVisitor<K, V> implements StreamReceiver<K, V> {
+public abstract class StreamVisitor<K, V> implements StreamReceiver<K, V> {
     /** */
     private static final long serialVersionUID = 0L;
 
-    /** Tuple visitor. */
-    private IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>> vis;
+    /** {@inheritDoc} */
+    @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K,
V>> entries) throws IgniteException {
+        for (Map.Entry<K, V> entry : entries)
+            visit(cache, entry);
+    }
 
     /**
-     * Visitor to visit every stream key-value tuple. Note, that the visitor
-     * does not update the cache. If the tuple needs to be stored in the cache,
-     * then {@code cache.put(...)} should be called explicitely.
+     * Visits one cache entry.
      *
-     * @param vis Stream key-value tuple visitor.
+     * @param cache Cache.
+     * @param entry Visited entry.
      */
-    public StreamVisitor(IgniteBiInClosure<IgniteCache<K, V>, Map.Entry<K, V>>
vis) {
-        this.vis = vis;
-    }
+    protected abstract void visit(IgniteCache<K, V> cache, Map.Entry<K, V> entry);
 
-    /** {@inheritDoc} */
-    @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K,
V>> entries) throws IgniteException {
-        for (Map.Entry<K, V> entry : entries)
-            vis.apply(cache, entry);
+    /**
+     * Creates a new visitor based on instance of {@link IgniteBiInClosure}.
+     *
+     * @param c Closure.
+     * @return Stream visitor.
+     */
+    public static <K, V> StreamVisitor<K, V> from(final IgniteBiInClosure<IgniteCache<K,
V>, Map.Entry<K, V>> c) {
+        return new StreamVisitor<K, V>() {
+            @Override protected void visit(IgniteCache<K, V> cache, Map.Entry<K,
V> entry) {
+                c.apply(cache, entry);
+            }
+        };
     }
 }


Mime
View raw message