ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vkuliche...@apache.org
Subject [10/50] [abbrv] incubator-ignite git commit: # sprint-3 - Updated streaming examples.
Date Wed, 08 Apr 2015 01:52:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java
deleted file mode 100644
index 5a36838..0000000
--- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.java8.streaming.marketdata;
-
-import org.apache.ignite.configuration.*;
-
-/**
- * Configuration for the streaming caches for market data and financial instruments.
- */
-public class CacheConfig {
-    /**
-     * Configure streaming cache for market ticks.
-     */
-    public static CacheConfiguration<String, Double> marketTicksCache() {
-        return new CacheConfiguration<>("marketTicks");
-    }
-
-    /**
-     * Configure cache for financial instruments.
-     */
-    public static CacheConfiguration<String, Instrument> instrumentCache() {
-        CacheConfiguration<String, Instrument> instCache = new CacheConfiguration<>("instCache");
-
-        // Index some fields for querying portfolio positions.
-        instCache.setIndexedTypes(String.class, Instrument.class);
-
-        return instCache;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
deleted file mode 100644
index af9d0bf..0000000
--- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.java8.streaming.marketdata;
-
-import org.apache.ignite.cache.query.annotations.*;
-
-import java.io.*;
-
-/**
- * Financial instrument.
- */
-public class Instrument implements Serializable {
-    /** Instrument symbol. */
-    @QuerySqlField(index = true)
-    private final String symbol;
-
-    /** Open price. */
-    @QuerySqlField(index = true)
-    private double open;
-
-    /** High price. */
-    private double high;
-
-    /** Low price. */
-    private double low = Long.MAX_VALUE;
-
-    /** Close price. */
-    @QuerySqlField(index = true)
-    private double latest;
-
-    /**
-     * @param symbol Symbol.
-     */
-    Instrument(String symbol) {
-        this.symbol = symbol;
-    }
-
-    /**
-     * Updates this instrument based on the latest price.
-     *
-     * @param price Latest price.
-     */
-    public void update(double price) {
-        if (open == 0)
-            open = price;
-
-        high = Math.max(high, price);
-        low = Math.min(low, price);
-        this.latest = price;
-    }
-
-    /**
-     * @return Symbol.
-     */
-    public String symbol() {
-        return symbol;
-    }
-
-    /**
-     * @return Open price.
-     */
-    public double open() {
-        return open;
-    }
-
-    /**
-     * @return High price.
-     */
-    public double high() {
-        return high;
-    }
-
-    /**
-     * @return Low price.
-     */
-    public double low() {
-        return low;
-    }
-
-    /**
-     * @return Close price.
-     */
-    public double latest() {
-        return latest;
-    }
-
-    /** {@inheritDoc} */
-    @Override public synchronized String toString() {
-        return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest
- open) + ']';
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
deleted file mode 100644
index 402e19e..0000000
--- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.java8.streaming.marketdata;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.examples.*;
-
-import java.util.*;
-
-/**
- * Periodically query popular numbers from the streaming cache.
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote
nodes as specified below.</li>
- *     <li>Start streaming using {@link StreamMarketData}.</li>
- *     <li>Start querying top performing instruments using {@link QueryTopInstruments}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class QueryTopInstruments {
-    public static void main(String[] args) throws Exception {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
-            if (!ExamplesUtils.hasServerNodes(ignite))
-                return;
-
-            IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache());
-
-            // Select top 3 instruments.
-            SqlFieldsQuery top3qry = new SqlFieldsQuery(
-                "select symbol, (latest - open) from Instrument order by (latest - open)
desc limit 3");
-
-            // Select total profit.
-            SqlFieldsQuery profitQry = new SqlFieldsQuery("select sum(latest - open) from
Instrument");
-
-            // Query top 3 best performing instruments every 5 seconds.
-            while (true) {
-                // Execute queries.
-                List<List<?>> top3 = instCache.query(top3qry).getAll();
-                List<List<?>> profit = instCache.query(profitQry).getAll();
-
-                List<?> row = profit.get(0);
-
-                if (row.get(0) != null)
-                    System.out.printf("Total profit: %.2f%n", row.get(0));
-
-                // Print top 10 words.
-                ExamplesUtils.printQueryResults(top3);
-
-                Thread.sleep(5000);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
deleted file mode 100644
index 1eabfb3..0000000
--- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.java8.streaming.marketdata;
-
-import org.apache.ignite.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.stream.*;
-
-import java.util.*;
-
-/**
- * Stream random numbers into the streaming cache.
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote
nodes as specified below.</li>
- *     <li>Start streaming using {@link StreamMarketData}.</li>
- *     <li>Start querying top performing instruments using {@link QueryTopInstruments}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class StreamMarketData {
-    /** Random number generator. */
-    private static final Random RAND = new Random();
-
-    /** The list of instruments. */
-    private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO",
"ORCL", "CSCO", "AMZN", "RHT"};
-
-    /** The list of initial instrument prices. */
-    private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03,
44.41, 28.44, 378.49, 69.50};
-
-    public static void main(String[] args) throws Exception {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
-            if (!ExamplesUtils.hasServerNodes(ignite))
-                return;
-
-            // The cache is configured with sliding window holding 1 second of the streaming
data.
-            IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache());
-            IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache());
-
-            try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName()))
{
-                // Note that we receive market data, but do not populate 'mktCache' (it remains
empty).
-                // Instead we update the instruments in the 'instCache'.
-                mktStmr.receiver(StreamVisitor.from((cache, e) -> {
-                    String symbol = e.getKey();
-                    Double tick = e.getValue();
-
-                    Instrument inst = instCache.get(symbol);
-
-                    if (inst == null)
-                        inst = new Instrument(symbol);
-
-                    // Don't populate market cache, as we don't use it for querying.
-                    // Update cached instrument based on the latest market tick.
-                    inst.update(tick);
-
-                    instCache.put(symbol, inst);
-                }));
-
-                // Stream market data into market data stream cache.
-                while (true) {
-                    for (int j = 0; j < INSTRUMENTS.length; j++) {
-                        // Use gaussian distribution to ensure that
-                        // numbers closer to 0 have higher probability.
-                        double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian());
-
-                        mktStmr.addData(INSTRUMENTS[j], price);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
-     * Rounds double value to two significant signs.
-     *
-     * @param val value to be rounded.
-     * @return rounded double value.
-     */
-    private static double round2(double val) {
-        return Math.floor(100 * val + 0.5) / 100;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
deleted file mode 100644
index 4de5fb6..0000000
--- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.java8.streaming.numbers;
-
-import org.apache.ignite.configuration.*;
-
-import javax.cache.configuration.*;
-import javax.cache.expiry.*;
-
-import static java.util.concurrent.TimeUnit.*;
-
-/**
- * Configuration for the streaming cache to store the stream of random numbers.
- * This cache is configured with sliding window of 1 second, which means that
- * data older than 1 second will be automatically removed from the cache.
- */
-public class CacheConfig {
-    /**
-     * Configure streaming cache.
-     */
-    public static CacheConfiguration<Integer, Long> randomNumbersCache() {
-        CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers");
-
-        cfg.setIndexedTypes(Integer.class, Long.class);
-
-        // Sliding window of 1 seconds.
-        cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS,
1))));
-
-        return cfg;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
deleted file mode 100644
index 98fa410..0000000
--- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.java8.streaming.numbers;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.query.*;
-import org.apache.ignite.examples.*;
-
-import java.util.*;
-
-/**
- * Periodically query popular numbers from the streaming cache.
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote
nodes as specified below.</li>
- *     <li>Start streaming using {@link StreamRandomNumbers}.</li>
- *     <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class QueryPopularNumbers {
-    public static void main(String[] args) throws Exception {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
-            if (!ExamplesUtils.hasServerNodes(ignite))
-                return;
-
-            // The cache is configured with sliding window holding 1 second of the streaming
data.
-            IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
-
-            // Select top 10 words.
-            SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order
by _val desc limit 10");
-
-            // Select average, min, and max counts among all the words.
-            SqlFieldsQuery statsQry = new SqlFieldsQuery("select avg(_val), min(_val), max(_val)
from Long");
-
-            // Query top 10 popular numbers every 5 seconds.
-            while (true) {
-                // Execute queries.
-                List<List<?>> top10 = stmCache.query(top10Qry).getAll();
-                List<List<?>> stats = stmCache.query(statsQry).getAll();
-
-                // Print average count.
-                List<?> row = stats.get(0);
-
-                if (row.get(0) != null)
-                    System.out.printf("Query results [avg=%.2f, min=%d, max=%d]%n", row.get(0),
row.get(1), row.get(2));
-
-                // Print top 10 words.
-                ExamplesUtils.printQueryResults(top10);
-
-                Thread.sleep(5000);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
deleted file mode 100644
index e52b674..0000000
--- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.java8.streaming.numbers;
-
-import org.apache.ignite.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.stream.*;
-
-import java.util.*;
-
-/**
- * Stream random numbers into the streaming cache.
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote
nodes as specified below.</li>
- *     <li>Start streaming using {@link StreamRandomNumbers}.</li>
- *     <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class StreamRandomNumbers {
-    /** Random number generator. */
-    private static final Random RAND = new Random();
-
-    /** Range within which to generate numbers. */
-    private static final int RANGE = 1000;
-
-    public static void main(String[] args) throws Exception {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
-            if (!ExamplesUtils.hasServerNodes(ignite))
-                return;
-
-            // The cache is configured with sliding window holding 1 second of the streaming
data.
-            IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
-
-            try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName()))
{
-                // Allow data updates.
-                stmr.allowOverwrite(true);
-
-                // Configure data transformation to count instances of the same number.
-                stmr.receiver(StreamTransformer.from((e, arg) -> {
-                    Long val = e.getValue();
-
-                    e.setValue(val == null ? 1L : val + 1);
-
-                    return null;
-                }));
-
-                // Stream random numbers into the streamer cache.
-                while (true)
-                    stmr.addData(RAND.nextInt(RANGE), 1L);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java
new file mode 100644
index 0000000..4ea1245
--- /dev/null
+++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.streaming.transformers;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.stream.*;
+
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote
nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamTransformerExample}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class StreamTransformerExample {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            CacheConfiguration<Integer, Long> cfg = new CacheConfiguration<>("randomNumbers");
+
+            // Index key and value.
+            cfg.setIndexedTypes(Integer.class, Long.class);
+
+            // Auto-close cache at the end of the example.
+            try (IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(cfg))
{
+                try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName()))
{
+                    // Allow data updates.
+                    stmr.allowOverwrite(true);
+
+                    // Configure data transformation to count instances of the same number.
+                    stmr.receiver(StreamTransformer.from((e, arg) -> {
+                        // Get current count.
+                        Long val = e.getValue();
+
+                        // Increment count by 1.
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }));
+
+                    // Stream 10 million of random numbers into the streamer cache.
+                    for (int i = 1; i <= 10_000_000; i++) {
+                        stmr.addData(RAND.nextInt(RANGE), 1L);
+
+                        if (i % 500_000 == 0)
+                            System.out.println("Number of tuples streamed into Ignite: "
+ i);
+                    }
+                }
+
+                // Query top 10 most popular numbers every.
+                SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long
order by _val desc limit 10");
+
+                // Execute queries.
+                List<List<?>> top10 = stmCache.query(top10Qry).getAll();
+
+                System.out.println("Top 10 most popular numbers:");
+
+                // Print top 10 words.
+                ExamplesUtils.printQueryResults(top10);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java
b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java
new file mode 100644
index 0000000..009a7c3
--- /dev/null
+++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.java8.streaming.visitors;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.query.*;
+import org.apache.ignite.cache.query.annotations.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.stream.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache.
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote
nodes as specified below.</li>
+ *     <li>Start streaming using {@link StreamVisitorExample}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class StreamVisitorExample {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** The list of instruments. */
+    private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO",
"ORCL", "CSCO", "AMZN", "RHT"};
+
+    /** The list of initial instrument prices. */
+    private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03,
44.41, 28.44, 378.49, 69.50};
+
+    public static void main(String[] args) throws Exception {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // Market data cache with default configuration.
+            CacheConfiguration<String, Double> mktDataCfg = new CacheConfiguration<String,
Double>("marketTicks");
+
+            // Financial instrument cache configuration.
+            CacheConfiguration<String, Instrument> instCfg = new CacheConfiguration<>("instCache");
+
+            // Index key and value for querying financial instruments.
+            // Note that Instrument class has @QuerySqlField annotation for secondary field
indexing.
+            instCfg.setIndexedTypes(String.class, Instrument.class);
+
+            // Auto-close caches at the end of the example.
+            try (
+                IgniteCache<String, Double> mktCache = ignite.getOrCreateCache(mktDataCfg);
+                IgniteCache<String, Instrument> instCache = ignite.getOrCreateCache(instCfg)
+            ) {
+                try (IgniteDataStreamer<String, Double> mktStmr = ignite.dataStreamer(mktCache.getName()))
{
+                    // Note that we receive market data, but do not populate 'mktCache' (it
remains empty).
+                    // Instead we update the instruments in the 'instCache'.
+                    // Since both, 'instCache' and 'mktCache' use the same key, updates are
collocated.
+                    mktStmr.receiver(StreamVisitor.from((cache, e) -> {
+                        String symbol = e.getKey();
+                        Double tick = e.getValue();
+
+                        Instrument inst = instCache.get(symbol);
+
+                        if (inst == null)
+                            inst = new Instrument(symbol);
+
+                        // Don't populate market cache, as we don't use it for querying.
+                        // Update cached instrument based on the latest market tick.
+                        inst.update(tick);
+
+                        instCache.put(symbol, inst);
+                    }));
+
+                    // Stream 10 million market data ticks into the system.
+                    for (int i = 1; i <= 10_000_000; i++) {
+                        int idx = RAND.nextInt(INSTRUMENTS.length);
+
+                        // Use gaussian distribution to ensure that
+                        // numbers closer to 0 have higher probability.
+                        double price = round2(INITIAL_PRICES[idx] + RAND.nextGaussian());
+
+                        mktStmr.addData(INSTRUMENTS[idx], price);
+
+                        if (i % 500_000 == 0)
+                            System.out.println("Number of tuples streamed into Ignite: "
+ i);
+                    }
+                }
+
+                // Select top 3 best performing instruments.
+                SqlFieldsQuery top3qry = new SqlFieldsQuery(
+                    "select symbol, (latest - open) from Instrument order by (latest - open)
desc limit 3");
+
+                // Execute queries.
+                List<List<?>> top3 = instCache.query(top3qry).getAll();
+
+                System.out.println("Top performing financial instruments: ");
+
+                // Print top 10 words.
+                ExamplesUtils.printQueryResults(top3);
+            }
+        }
+    }
+
+    /**
+     * Rounds double value to two significant signs.
+     *
+     * @param val value to be rounded.
+     * @return rounded double value.
+     */
+    private static double round2(double val) {
+        return Math.floor(100 * val + 0.5) / 100;
+    }
+
+    /**
+     * Financial instrument.
+     */
+    public static class Instrument implements Serializable {
+        /** Instrument symbol. */
+        @QuerySqlField(index = true)
+        private final String symbol;
+
+        /** Open price. */
+        @QuerySqlField(index = true)
+        private double open;
+
+        /** Close price. */
+        @QuerySqlField(index = true)
+        private double latest;
+
+        /**
+         * @param symbol Symbol.
+         */
+        Instrument(String symbol) {
+            this.symbol = symbol;
+        }
+
+        /**
+         * Updates this instrument based on the latest price.
+         *
+         * @param price Latest price.
+         */
+        public void update(double price) {
+            if (open == 0)
+                open = price;
+
+            this.latest = price;
+        }
+
+        /** {@inheritDoc} */
+        @Override public synchronized String toString() {
+            return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" +
(latest - open) + ']';
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityUuid.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityUuid.java
b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityUuid.java
new file mode 100644
index 0000000..45f05f0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityUuid.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.affinity;
+
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+/**
+ * Guaranteed unique affinity-based key.
+ */
+public class AffinityUuid extends AffinityKey<IgniteUuid> {
+    /**
+     * Empty constructor.
+     */
+    public AffinityUuid() {
+        // No-op.
+    }
+
+    /**
+     * Constructs unique affinity UUID based on affinity key.
+     *
+     * @param affKey Affinity key to use for collocation.
+     */
+    public AffinityUuid(Object affKey) {
+        super(IgniteUuid.randomUuid(), affKey);
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(AffinityUuid.class, this);
+    }
+}


Mime
View raw message