Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 75D5717650 for ; Tue, 7 Apr 2015 09:02:00 +0000 (UTC) Received: (qmail 10998 invoked by uid 500); 7 Apr 2015 09:02:00 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 10968 invoked by uid 500); 7 Apr 2015 09:02:00 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 10959 invoked by uid 99); 7 Apr 2015 09:02:00 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 09:02:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Tue, 07 Apr 2015 09:01:29 +0000 Received: (qmail 8079 invoked by uid 99); 7 Apr 2015 09:01:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Apr 2015 09:01:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 98B8DE2F4E; Tue, 7 Apr 2015 09:01:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sboikov@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 07 Apr 2015 09:02:00 -0000 Message-Id: <0951d38bb5754fe0985fc91ec113bd3e@git.apache.org> In-Reply-To: <35ca2aebfab746e6bb1211d0356a4f2a@git.apache.org> References: <35ca2aebfab746e6bb1211d0356a4f2a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [38/50] [abbrv] incubator-ignite git commit: # sprint-3 - Updated streaming examples. X-Virus-Checked: Checked by ClamAV on apache.org http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java deleted file mode 100644 index 5a36838..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/CacheConfig.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming.marketdata; - -import org.apache.ignite.configuration.*; - -/** - * Configuration for the streaming caches for market data and financial instruments. - */ -public class CacheConfig { - /** - * Configure streaming cache for market ticks. - */ - public static CacheConfiguration marketTicksCache() { - return new CacheConfiguration<>("marketTicks"); - } - - /** - * Configure cache for financial instruments. - */ - public static CacheConfiguration instrumentCache() { - CacheConfiguration instCache = new CacheConfiguration<>("instCache"); - - // Index some fields for querying portfolio positions. - instCache.setIndexedTypes(String.class, Instrument.class); - - return instCache; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java deleted file mode 100644 index af9d0bf..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/Instrument.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming.marketdata; - -import org.apache.ignite.cache.query.annotations.*; - -import java.io.*; - -/** - * Financial instrument. - */ -public class Instrument implements Serializable { - /** Instrument symbol. */ - @QuerySqlField(index = true) - private final String symbol; - - /** Open price. */ - @QuerySqlField(index = true) - private double open; - - /** High price. */ - private double high; - - /** Low price. */ - private double low = Long.MAX_VALUE; - - /** Close price. */ - @QuerySqlField(index = true) - private double latest; - - /** - * @param symbol Symbol. - */ - Instrument(String symbol) { - this.symbol = symbol; - } - - /** - * Updates this instrument based on the latest price. - * - * @param price Latest price. - */ - public void update(double price) { - if (open == 0) - open = price; - - high = Math.max(high, price); - low = Math.min(low, price); - this.latest = price; - } - - /** - * @return Symbol. - */ - public String symbol() { - return symbol; - } - - /** - * @return Open price. - */ - public double open() { - return open; - } - - /** - * @return High price. - */ - public double high() { - return high; - } - - /** - * @return Low price. - */ - public double low() { - return low; - } - - /** - * @return Close price. - */ - public double latest() { - return latest; - } - - /** {@inheritDoc} */ - @Override public synchronized String toString() { - return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest - open) + ']'; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java deleted file mode 100644 index 402e19e..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/QueryTopInstruments.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming.marketdata; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.examples.*; - -import java.util.*; - -/** - * Periodically query popular numbers from the streaming cache. - * To start the example, you should: - *
    - *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • - *
  • Start streaming using {@link StreamMarketData}.
  • - *
  • Start querying top performing instruments using {@link QueryTopInstruments}.
  • - *
- *

- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class QueryTopInstruments { - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - IgniteCache instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); - - // Select top 3 instruments. - SqlFieldsQuery top3qry = new SqlFieldsQuery( - "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); - - // Select total profit. - SqlFieldsQuery profitQry = new SqlFieldsQuery("select sum(latest - open) from Instrument"); - - // Query top 3 best performing instruments every 5 seconds. - while (true) { - // Execute queries. - List> top3 = instCache.query(top3qry).getAll(); - List> profit = instCache.query(profitQry).getAll(); - - List row = profit.get(0); - - if (row.get(0) != null) - System.out.printf("Total profit: %.2f%n", row.get(0)); - - // Print top 10 words. - ExamplesUtils.printQueryResults(top3); - - Thread.sleep(5000); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java deleted file mode 100644 index 1eabfb3..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/marketdata/StreamMarketData.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming.marketdata; - -import org.apache.ignite.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.stream.*; - -import java.util.*; - -/** - * Stream random numbers into the streaming cache. - * To start the example, you should: - *

    - *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • - *
  • Start streaming using {@link StreamMarketData}.
  • - *
  • Start querying top performing instruments using {@link QueryTopInstruments}.
  • - *
- *

- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class StreamMarketData { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** The list of instruments. */ - private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; - - /** The list of initial instrument prices. */ - private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; - - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache mktCache = ignite.getOrCreateCache(CacheConfig.marketTicksCache()); - IgniteCache instCache = ignite.getOrCreateCache(CacheConfig.instrumentCache()); - - try (IgniteDataStreamer mktStmr = ignite.dataStreamer(mktCache.getName())) { - // Note that we receive market data, but do not populate 'mktCache' (it remains empty). - // Instead we update the instruments in the 'instCache'. - mktStmr.receiver(StreamVisitor.from((cache, e) -> { - String symbol = e.getKey(); - Double tick = e.getValue(); - - Instrument inst = instCache.get(symbol); - - if (inst == null) - inst = new Instrument(symbol); - - // Don't populate market cache, as we don't use it for querying. - // Update cached instrument based on the latest market tick. - inst.update(tick); - - instCache.put(symbol, inst); - })); - - // Stream market data into market data stream cache. - while (true) { - for (int j = 0; j < INSTRUMENTS.length; j++) { - // Use gaussian distribution to ensure that - // numbers closer to 0 have higher probability. - double price = round2(INITIAL_PRICES[j] + RAND.nextGaussian()); - - mktStmr.addData(INSTRUMENTS[j], price); - } - } - } - } - } - - /** - * Rounds double value to two significant signs. - * - * @param val value to be rounded. - * @return rounded double value. - */ - private static double round2(double val) { - return Math.floor(100 * val + 0.5) / 100; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java deleted file mode 100644 index 4de5fb6..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/CacheConfig.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming.numbers; - -import org.apache.ignite.configuration.*; - -import javax.cache.configuration.*; -import javax.cache.expiry.*; - -import static java.util.concurrent.TimeUnit.*; - -/** - * Configuration for the streaming cache to store the stream of random numbers. - * This cache is configured with sliding window of 1 second, which means that - * data older than 1 second will be automatically removed from the cache. - */ -public class CacheConfig { - /** - * Configure streaming cache. - */ - public static CacheConfiguration randomNumbersCache() { - CacheConfiguration cfg = new CacheConfiguration<>("randomNumbers"); - - cfg.setIndexedTypes(Integer.class, Long.class); - - // Sliding window of 1 seconds. - cfg.setExpiryPolicyFactory(FactoryBuilder.factoryOf(new CreatedExpiryPolicy(new Duration(SECONDS, 1)))); - - return cfg; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java deleted file mode 100644 index 98fa410..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/QueryPopularNumbers.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming.numbers; - -import org.apache.ignite.*; -import org.apache.ignite.cache.query.*; -import org.apache.ignite.examples.*; - -import java.util.*; - -/** - * Periodically query popular numbers from the streaming cache. - * To start the example, you should: - *

    - *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • - *
  • Start streaming using {@link StreamRandomNumbers}.
  • - *
  • Start querying popular numbers using {@link QueryPopularNumbers}.
  • - *
- *

- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class QueryPopularNumbers { - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); - - // Select top 10 words. - SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); - - // Select average, min, and max counts among all the words. - SqlFieldsQuery statsQry = new SqlFieldsQuery("select avg(_val), min(_val), max(_val) from Long"); - - // Query top 10 popular numbers every 5 seconds. - while (true) { - // Execute queries. - List> top10 = stmCache.query(top10Qry).getAll(); - List> stats = stmCache.query(statsQry).getAll(); - - // Print average count. - List row = stats.get(0); - - if (row.get(0) != null) - System.out.printf("Query results [avg=%.2f, min=%d, max=%d]%n", row.get(0), row.get(1), row.get(2)); - - // Print top 10 words. - ExamplesUtils.printQueryResults(top10); - - Thread.sleep(5000); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java deleted file mode 100644 index e52b674..0000000 --- a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/numbers/StreamRandomNumbers.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.java8.streaming.numbers; - -import org.apache.ignite.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.stream.*; - -import java.util.*; - -/** - * Stream random numbers into the streaming cache. - * To start the example, you should: - *

    - *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • - *
  • Start streaming using {@link StreamRandomNumbers}.
  • - *
  • Start querying popular numbers using {@link QueryPopularNumbers}.
  • - *
- *

- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class StreamRandomNumbers { - /** Random number generator. */ - private static final Random RAND = new Random(); - - /** Range within which to generate numbers. */ - private static final int RANGE = 1000; - - public static void main(String[] args) throws Exception { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); - - try (IgniteDataStreamer stmr = ignite.dataStreamer(stmCache.getName())) { - // Allow data updates. - stmr.allowOverwrite(true); - - // Configure data transformation to count instances of the same number. - stmr.receiver(StreamTransformer.from((e, arg) -> { - Long val = e.getValue(); - - e.setValue(val == null ? 1L : val + 1); - - return null; - })); - - // Stream random numbers into the streamer cache. - while (true) - stmr.addData(RAND.nextInt(RANGE), 1L); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java new file mode 100644 index 0000000..4ea1245 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/transformers/StreamTransformerExample.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming.transformers; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.stream.*; + +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + *

    + *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • + *
  • Start streaming using {@link StreamTransformerExample}.
  • + *
+ *

+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamTransformerExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + CacheConfiguration cfg = new CacheConfiguration<>("randomNumbers"); + + // Index key and value. + cfg.setIndexedTypes(Integer.class, Long.class); + + // Auto-close cache at the end of the example. + try (IgniteCache stmCache = ignite.getOrCreateCache(cfg)) { + try (IgniteDataStreamer stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same number. + stmr.receiver(StreamTransformer.from((e, arg) -> { + // Get current count. + Long val = e.getValue(); + + // Increment count by 1. + e.setValue(val == null ? 1L : val + 1); + + return null; + })); + + // Stream 10 million of random numbers into the streamer cache. + for (int i = 1; i <= 10_000_000; i++) { + stmr.addData(RAND.nextInt(RANGE), 1L); + + if (i % 500_000 == 0) + System.out.println("Number of tuples streamed into Ignite: " + i); + } + } + + // Query top 10 most popular numbers every. + SqlFieldsQuery top10Qry = new SqlFieldsQuery("select _key, _val from Long order by _val desc limit 10"); + + // Execute queries. + List> top10 = stmCache.query(top10Qry).getAll(); + + System.out.println("Top 10 most popular numbers:"); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top10); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java new file mode 100644 index 0000000..009a7c3 --- /dev/null +++ b/examples/src/main/java8/org/apache/ignite/examples/java8/streaming/visitors/StreamVisitorExample.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.java8.streaming.visitors; + +import org.apache.ignite.*; +import org.apache.ignite.cache.query.*; +import org.apache.ignite.cache.query.annotations.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.stream.*; + +import java.io.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache. + * To start the example, you should: + *

    + *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • + *
  • Start streaming using {@link StreamVisitorExample}.
  • + *
+ *

+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class StreamVisitorExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** The list of instruments. */ + private static final String[] INSTRUMENTS = {"IBM", "GOOG", "MSFT", "GE", "EBAY", "YHOO", "ORCL", "CSCO", "AMZN", "RHT"}; + + /** The list of initial instrument prices. */ + private static final double[] INITIAL_PRICES = {194.9, 893.49, 34.21, 23.24, 57.93, 45.03, 44.41, 28.44, 378.49, 69.50}; + + public static void main(String[] args) throws Exception { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // Market data cache with default configuration. + CacheConfiguration mktDataCfg = new CacheConfiguration("marketTicks"); + + // Financial instrument cache configuration. + CacheConfiguration instCfg = new CacheConfiguration<>("instCache"); + + // Index key and value for querying financial instruments. + // Note that Instrument class has @QuerySqlField annotation for secondary field indexing. + instCfg.setIndexedTypes(String.class, Instrument.class); + + // Auto-close caches at the end of the example. + try ( + IgniteCache mktCache = ignite.getOrCreateCache(mktDataCfg); + IgniteCache instCache = ignite.getOrCreateCache(instCfg) + ) { + try (IgniteDataStreamer mktStmr = ignite.dataStreamer(mktCache.getName())) { + // Note that we receive market data, but do not populate 'mktCache' (it remains empty). + // Instead we update the instruments in the 'instCache'. + // Since both, 'instCache' and 'mktCache' use the same key, updates are collocated. + mktStmr.receiver(StreamVisitor.from((cache, e) -> { + String symbol = e.getKey(); + Double tick = e.getValue(); + + Instrument inst = instCache.get(symbol); + + if (inst == null) + inst = new Instrument(symbol); + + // Don't populate market cache, as we don't use it for querying. + // Update cached instrument based on the latest market tick. + inst.update(tick); + + instCache.put(symbol, inst); + })); + + // Stream 10 million market data ticks into the system. + for (int i = 1; i <= 10_000_000; i++) { + int idx = RAND.nextInt(INSTRUMENTS.length); + + // Use gaussian distribution to ensure that + // numbers closer to 0 have higher probability. + double price = round2(INITIAL_PRICES[idx] + RAND.nextGaussian()); + + mktStmr.addData(INSTRUMENTS[idx], price); + + if (i % 500_000 == 0) + System.out.println("Number of tuples streamed into Ignite: " + i); + } + } + + // Select top 3 best performing instruments. + SqlFieldsQuery top3qry = new SqlFieldsQuery( + "select symbol, (latest - open) from Instrument order by (latest - open) desc limit 3"); + + // Execute queries. + List> top3 = instCache.query(top3qry).getAll(); + + System.out.println("Top performing financial instruments: "); + + // Print top 10 words. + ExamplesUtils.printQueryResults(top3); + } + } + } + + /** + * Rounds double value to two significant signs. + * + * @param val value to be rounded. + * @return rounded double value. + */ + private static double round2(double val) { + return Math.floor(100 * val + 0.5) / 100; + } + + /** + * Financial instrument. + */ + public static class Instrument implements Serializable { + /** Instrument symbol. */ + @QuerySqlField(index = true) + private final String symbol; + + /** Open price. */ + @QuerySqlField(index = true) + private double open; + + /** Close price. */ + @QuerySqlField(index = true) + private double latest; + + /** + * @param symbol Symbol. + */ + Instrument(String symbol) { + this.symbol = symbol; + } + + /** + * Updates this instrument based on the latest price. + * + * @param price Latest price. + */ + public void update(double price) { + if (open == 0) + open = price; + + this.latest = price; + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + return "Instrument [symbol=" + symbol + ", latest=" + latest + ", change=" + (latest - open) + ']'; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b2984c9b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityUuid.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityUuid.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityUuid.java new file mode 100644 index 0000000..45f05f0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/AffinityUuid.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.cache.affinity; + +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; + +/** + * Guaranteed unique affinity-based key. + */ +public class AffinityUuid extends AffinityKey { + /** + * Empty constructor. + */ + public AffinityUuid() { + // No-op. + } + + /** + * Constructs unique affinity UUID based on affinity key. + * + * @param affKey Affinity key to use for collocation. + */ + public AffinityUuid(Object affKey) { + super(IgniteUuid.randomUuid(), affKey); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(AffinityUuid.class, this); + } +}