Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9365C18239 for ; Thu, 21 May 2015 02:11:28 +0000 (UTC) Received: (qmail 37451 invoked by uid 500); 21 May 2015 02:11:28 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 37421 invoked by uid 500); 21 May 2015 02:11:28 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 37403 invoked by uid 99); 21 May 2015 02:11:28 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 May 2015 02:11:28 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id D7E251A3799 for ; Thu, 21 May 2015 02:11:27 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.791 X-Spam-Level: * X-Spam-Status: No, score=1.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id NN4mbEjs6h4G for ; Thu, 21 May 2015 02:11:12 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with SMTP id D394A262DA for ; Thu, 21 May 2015 02:10:58 +0000 (UTC) Received: (qmail 35558 invoked by uid 99); 21 May 2015 02:10:58 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 21 May 2015 02:10:58 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C4BBE4432; Thu, 21 May 2015 02:10:58 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.incubator.apache.org Date: Thu, 21 May 2015 02:11:10 -0000 Message-Id: <234416f9264f48c69264ac47076ae1ea@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [13/50] [abbrv] incubator-ignite git commit: ignite-430 Words count Socket streamer examples ignite-430 Words count Socket streamer examples Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7ee85179 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7ee85179 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7ee85179 Branch: refs/heads/ignite-80 Commit: 7ee85179103df637ba594fab68755dee71b69997 Parents: d87efce Author: agura Authored: Wed May 13 20:56:22 2015 +0300 Committer: agura Committed: Fri May 15 03:44:34 2015 +0300 ---------------------------------------------------------------------- .../ignite/examples/streaming/package-info.java | 1 - .../streaming/socket/SocketStreamerExample.java | 128 -------- .../socket/WordsSocketStreamerClient.java | 86 +++++ .../socket/WordsSocketStreamerServer.java | 93 ++++++ .../socket/ZStringsSocketStreamerExample.java | 141 --------- .../socket/ZWordsSocketStreamerClient.java | 81 +++++ .../socket/ZWordsSocketStreamerServer.java | 111 +++++++ .../examples/streaming/socket/package-info.java | 3 +- .../streaming/wordcount/CacheConfig.java | 2 +- .../streaming/wordcount/QueryWords.java | 2 +- .../streaming/wordcount/StreamWords.java | 2 +- .../streaming/wordcount/package-info.java | 1 - .../org/apache/ignite/stream/StreamAdapter.java | 111 +++++++ .../ignite/stream/StreamTupleExtractor.java | 33 ++ .../ignite/stream/adapters/StreamAdapter.java | 111 ------- .../stream/adapters/StreamTupleExtractor.java | 33 -- .../ignite/stream/adapters/package-info.java | 21 -- .../stream/socket/IgniteSocketStreamer.java | 217 ------------- .../ignite/stream/socket/SocketStreamer.java | 218 +++++++++++++ .../socket/IgniteSocketStreamerSelfTest.java | 315 ------------------- .../stream/socket/SocketStreamerSelfTest.java | 315 +++++++++++++++++++ .../testsuites/IgniteStreamTestSuite.java | 2 +- 22 files changed, 1053 insertions(+), 974 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java index 43dea13..43fbab3 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java @@ -16,7 +16,6 @@ */ /** - * * Demonstrates usage of data streamer. */ package org.apache.ignite.examples.streaming; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java deleted file mode 100644 index 487572a..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.socket; - -import org.apache.ignite.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.examples.streaming.wordcount.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.stream.adapters.*; -import org.apache.ignite.stream.socket.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message size based protocol. - *

- * To start the example, you should: - *

    - *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • - *
  • Start streaming using {@link SocketStreamerExample}.
  • - *
  • Start querying popular numbers using {@link QueryWords}.
  • - *
- *

- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class SocketStreamerExample { - /** Port. */ - private static final int PORT = 5555; - - /** - * @param args Args. - */ - public static void main(String[] args) throws InterruptedException, IOException { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache stmCache = ignite.getOrCreateCache(CacheConfig.wordCache()); - - try (IgniteDataStreamer stmr = ignite.dataStreamer(stmCache.getName())) { - InetAddress addr = InetAddress.getLocalHost(); - - // Configure socket streamer - IgniteSocketStreamer sockStmr = new IgniteSocketStreamer<>(); - - sockStmr.setAddr(addr); - - sockStmr.setPort(PORT); - - sockStmr.setIgnite(ignite); - - sockStmr.setStreamer(stmr); - - sockStmr.setTupleExtractor(new StreamTupleExtractor() { - @Override public Map.Entry extract(String word) { - // By using AffinityUuid we ensure that identical - // words are processed on the same cluster node. - return new IgniteBiTuple<>(new AffinityUuid(word), word); - } - }); - - sockStmr.start(); - - sendData(addr, PORT); - } - } - } - - /** - * @param addr Address. - * @param port Port. - */ - private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException { - try (Socket sock = new Socket(addr, port); - OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) { - while (true) { - try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt"); - LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) { - for (String line = rdr.readLine(); line != null; line = rdr.readLine()) { - for (String word : line.split(" ")) { - if (!word.isEmpty()) { - // Stream words into Ignite through socket. - try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream out = new ObjectOutputStream(bos)) { - - // Write message - out.writeObject(word); - - byte[] arr = bos.toByteArray(); - - // Write message length - oos.write(arr.length >>> 24); - oos.write(arr.length >>> 16); - oos.write(arr.length >>> 8); - oos.write(arr.length); - - oos.write(arr); - } - } - } - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java new file mode 100644 index 0000000..c5ec079 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.socket; + +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.wordcount.*; +import org.apache.ignite.stream.socket.*; + +import java.io.*; +import java.net.*; + +/** + * Sends words to socket server based on {@link SocketStreamer} using message size based protocol. + *

+ * To start the example, you should: + *

    + *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • + *
  • Start socket server using {@link WordsSocketStreamerServer}.
  • + *
  • Start a few socket clients using {@link WordsSocketStreamerClient}.
  • + *
  • Start querying popular words using {@link QueryWords}.
  • + *
+ *

+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class WordsSocketStreamerClient { + /** Port. */ + private static final int PORT = 5555; + + /** + * @param args Args. + */ + public static void main(String[] args) throws IOException { + InetAddress addr = InetAddress.getLocalHost(); + + try (Socket sock = new Socket(addr, PORT); + OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) { + + System.out.println("Words streaming started."); + + while (true) { + try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt"); + LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) { + for (String line = rdr.readLine(); line != null; line = rdr.readLine()) { + for (String word : line.split(" ")) { + if (!word.isEmpty()) { + // Stream words into Ignite through socket. + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + + // Write message + out.writeObject(word); + + byte[] arr = bos.toByteArray(); + + // Write message length + oos.write(arr.length >>> 24); + oos.write(arr.length >>> 16); + oos.write(arr.length >>> 8); + oos.write(arr.length); + + oos.write(arr); + } + } + } + } + } + } + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java new file mode 100644 index 0000000..5af746d --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.socket; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.wordcount.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.stream.socket.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Receives words through socket using {@link SocketStreamer} and message size based protocol + * and streams them into Ignite cache. + *

+ * To start the example, you should: + *

    + *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • + *
  • Start socket server using {@link WordsSocketStreamerServer}.
  • + *
  • Start a few socket clients using {@link WordsSocketStreamerClient}.
  • + *
  • Start querying popular words using {@link QueryWords}.
  • + *
+ *

+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class WordsSocketStreamerServer { + /** Port. */ + private static final int PORT = 5555; + + /** + * @param args Args. + */ + public static void main(String[] args) throws InterruptedException, IOException { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + Ignite ignite = Ignition.start("examples/config/example-ignite.xml"); + + if (!ExamplesUtils.hasServerNodes(ignite)) { + ignite.close(); + + return; + } + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache stmCache = ignite.getOrCreateCache(CacheConfig.wordCache()); + + IgniteDataStreamer stmr = ignite.dataStreamer(stmCache.getName()); + + InetAddress addr = InetAddress.getLocalHost(); + + // Configure socket streamer + SocketStreamer sockStmr = new SocketStreamer<>(); + + sockStmr.setAddr(addr); + + sockStmr.setPort(PORT); + + sockStmr.setIgnite(ignite); + + sockStmr.setStreamer(stmr); + + sockStmr.setTupleExtractor(new StreamTupleExtractor() { + @Override public Map.Entry extract(String word) { + // By using AffinityUuid we ensure that identical + // words are processed on the same cluster node. + return new IgniteBiTuple<>(new AffinityUuid(word), word); + } + }); + + sockStmr.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java deleted file mode 100644 index fa5aa28..0000000 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.examples.streaming.socket; - -import org.apache.ignite.*; -import org.apache.ignite.cache.affinity.*; -import org.apache.ignite.examples.*; -import org.apache.ignite.examples.streaming.wordcount.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.stream.adapters.*; -import org.apache.ignite.stream.socket.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -/** - * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message delimiter based - * protocol. - *

- * Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams - * zero-terminated strings. - *

- * To start the example, you should: - *

    - *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • - *
  • Start streaming using {@link ZStringsSocketStreamerExample}.
  • - *
  • Start querying popular numbers using {@link QueryWords}.
  • - *
- *

- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. - */ -public class ZStringsSocketStreamerExample { - /** Port. */ - private static final int PORT = 5555; - - /** Delimiter. */ - private static final byte[] DELIM = new byte[] {0}; - - /** - * @param args Args. - */ - public static void main(String[] args) throws InterruptedException, IOException { - // Mark this cluster member as client. - Ignition.setClientMode(true); - - try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { - if (!ExamplesUtils.hasServerNodes(ignite)) - return; - - // The cache is configured with sliding window holding 1 second of the streaming data. - IgniteCache stmCache = ignite.getOrCreateCache(CacheConfig.wordCache()); - - try (IgniteDataStreamer stmr = ignite.dataStreamer(stmCache.getName())) { - InetAddress addr = InetAddress.getLocalHost(); - - // Configure socket streamer - IgniteSocketStreamer sockStmr = new IgniteSocketStreamer<>(); - - sockStmr.setAddr(addr); - - sockStmr.setPort(PORT); - - sockStmr.setDelimiter(DELIM); - - sockStmr.setIgnite(ignite); - - sockStmr.setStreamer(stmr); - - // Converter from zero-terminated string to Java strings. - sockStmr.setConverter(new SocketMessageConverter() { - @Override public String convert(byte[] msg) { - try { - return new String(msg, "ASCII"); - } - catch (UnsupportedEncodingException e) { - throw new IgniteException(e); - } - } - }); - - sockStmr.setTupleExtractor(new StreamTupleExtractor() { - @Override public Map.Entry extract(String word) { - // By using AffinityUuid we ensure that identical - // words are processed on the same cluster node. - return new IgniteBiTuple<>(new AffinityUuid(word), word); - } - }); - - sockStmr.start(); - - sendData(addr, PORT); - } - } - } - - /** - * @param addr Address. - * @param port Port. - */ - private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException { - try (Socket sock = new Socket(addr, port); - OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) { - - while (true) { - try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt"); - LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) { - for (String line = rdr.readLine(); line != null; line = rdr.readLine()) { - for (String word : line.split(" ")) { - if (!word.isEmpty()) { - // Stream words into Ignite through socket. - byte[] arr = word.getBytes("ASCII"); - - // Write message - oos.write(arr); - - // Write message delimiter - oos.write(DELIM); - } - } - } - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java new file mode 100644 index 0000000..c17ccdc --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.socket; + +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.wordcount.*; +import org.apache.ignite.stream.socket.*; + +import java.io.*; +import java.net.*; + +/** + * Sends words to socket server based on {@link SocketStreamer} using message delimiter based protocol. + * Example illustrates usage of TCP socket streamer in case of non-Java clients. + * In this example words are zero-terminated strings. + *

+ * To start the example, you should: + *

    + *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • + *
  • Start socket server using {@link ZWordsSocketStreamerServer}.
  • + *
  • Start a few socket clients using {@link ZWordsSocketStreamerClient}.
  • + *
  • Start querying popular words using {@link QueryWords}.
  • + *
+ *

+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class ZWordsSocketStreamerClient { + /** Port. */ + private static final int PORT = 5555; + + /** Delimiter. */ + private static final byte[] DELIM = new byte[] {0}; + + /** + * @param args Args. + */ + public static void main(String[] args) throws IOException { + InetAddress addr = InetAddress.getLocalHost(); + + try (Socket sock = new Socket(addr, PORT); + OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) { + + System.out.println("Words streaming started."); + + while (true) { + try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt"); + LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) { + for (String line = rdr.readLine(); line != null; line = rdr.readLine()) { + for (String word : line.split(" ")) { + if (!word.isEmpty()) { + // Stream words into Ignite through socket. + byte[] arr = word.getBytes("ASCII"); + + // Write message + oos.write(arr); + + // Write message delimiter + oos.write(DELIM); + } + } + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java new file mode 100644 index 0000000..a0ef9da --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.socket; + +import org.apache.ignite.*; +import org.apache.ignite.cache.affinity.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.wordcount.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.stream.socket.*; + +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Receives words through socket using {@link SocketStreamer} and message delimiter based protocol + * and streams them into Ignite cache. Example illustrates usage of TCP socket streamer in case of non-Java clients. + * In this example words are zero-terminated strings. + *

+ * To start the example, you should: + *

    + *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • + *
  • Start socket server using {@link ZWordsSocketStreamerServer}.
  • + *
  • Start a few socket clients using {@link ZWordsSocketStreamerClient}.
  • + *
  • Start querying popular words using {@link QueryWords}.
  • + *
+ *

+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class ZWordsSocketStreamerServer { + /** Port. */ + private static final int PORT = 5555; + + /** Delimiter. */ + private static final byte[] DELIM = new byte[] {0}; + + /** + * @param args Args. + */ + public static void main(String[] args) throws InterruptedException, IOException { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + Ignite ignite = Ignition.start("examples/config/example-ignite.xml"); + + if (!ExamplesUtils.hasServerNodes(ignite)) { + ignite.close(); + + return; + } + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache stmCache = ignite.getOrCreateCache(CacheConfig.wordCache()); + + IgniteDataStreamer stmr = ignite.dataStreamer(stmCache.getName()); + + InetAddress addr = InetAddress.getLocalHost(); + + // Configure socket streamer + SocketStreamer sockStmr = new SocketStreamer<>(); + + sockStmr.setAddr(addr); + + sockStmr.setPort(PORT); + + sockStmr.setDelimiter(DELIM); + + sockStmr.setIgnite(ignite); + + sockStmr.setStreamer(stmr); + + // Converter from zero-terminated string to Java strings. + sockStmr.setConverter(new SocketMessageConverter() { + @Override public String convert(byte[] msg) { + try { + return new String(msg, "ASCII"); + } + catch (UnsupportedEncodingException e) { + throw new IgniteException(e); + } + } + }); + + sockStmr.setTupleExtractor(new StreamTupleExtractor() { + @Override public Map.Entry extract(String word) { + // By using AffinityUuid we ensure that identical + // words are processed on the same cluster node. + return new IgniteBiTuple<>(new AffinityUuid(word), word); + } + }); + + sockStmr.start(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java index d0a480a..c516ab4 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java @@ -16,7 +16,6 @@ */ /** - * - * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples. + * Contains {@link org.apache.ignite.stream.socket.SocketStreamer} usage examples. */ package org.apache.ignite.examples.streaming.socket; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java index 58704ca..d17b97d 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java @@ -26,7 +26,7 @@ import javax.cache.expiry.*; import static java.util.concurrent.TimeUnit.*; /** - * Configuration for the streaming cache to store the stream of random numbers. + * Configuration for the streaming cache to store the stream of words. * This cache is configured with sliding window of 1 second, which means that * data older than 1 second will be automatically removed from the cache. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java index 3bd9d3d..149aa79 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java @@ -30,7 +30,7 @@ import java.util.*; *

    *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • *
  • Start streaming using {@link StreamWords}.
  • - *
  • Start querying popular numbers using {@link QueryWords}.
  • + *
  • Start querying popular words using {@link QueryWords}.
  • *
*

* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java index c59fa51..cc3c0cb 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java @@ -29,7 +29,7 @@ import java.io.*; *

    *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • *
  • Start streaming using {@link StreamWords}.
  • - *
  • Start querying popular numbers using {@link QueryWords}.
  • + *
  • Start querying popular words using {@link QueryWords}.
  • *
*

* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java index 010f86a..5d48ae3 100644 --- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java @@ -16,7 +16,6 @@ */ /** - * * Streaming word count example. */ package org.apache.ignite.examples.streaming.wordcount; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java new file mode 100644 index 0000000..0c4e2d1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream; + +import org.apache.ignite.*; + +import java.util.*; + +/** + * Convenience adapter for streamers. Adapters are optional components for + * streaming from different data sources. The purpose of adapters is to + * convert different message formats into Ignite stream key-value tuples + * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}. + */ +public abstract class StreamAdapter { + /** Tuple extractor. */ + private StreamTupleExtractor extractor; + + /** Streamer. */ + private IgniteDataStreamer stmr; + + /** Ignite. */ + private Ignite ignite; + + /** + * Empty constructor. + */ + protected StreamAdapter() { + // No-op. + } + + /** + * Stream adapter. + * + * @param stmr Streamer. + * @param extractor Tuple extractor. + */ + protected StreamAdapter(IgniteDataStreamer stmr, StreamTupleExtractor extractor) { + this.stmr = stmr; + this.extractor = extractor; + } + + /** + * @return Provided data streamer. + */ + public IgniteDataStreamer getStreamer() { + return stmr; + } + + /** + * @param stmr Ignite data streamer. + */ + public void setStreamer(IgniteDataStreamer stmr) { + this.stmr = stmr; + } + + /** + * @return Provided tuple extractor. + */ + public StreamTupleExtractor getTupleExtractor() { + return extractor; + } + + /** + * @param extractor Extractor for key-value tuples from messages. + */ + public void setTupleExtractor(StreamTupleExtractor extractor) { + this.extractor = extractor; + } + + /** + * @return Provided {@link Ignite} instance. + */ + public Ignite getIgnite() { + return ignite; + } + + /** + * @param ignite {@link Ignite} instance. + */ + public void setIgnite(Ignite ignite) { + this.ignite = ignite; + } + + /** + * Converts given message to a tuple and adds it to the underlying streamer. + * + * @param msg Message to convert. + */ + protected void addMessage(T msg) { + Map.Entry e = extractor.extract(msg); + + if (e != null) + stmr.addData(e); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java new file mode 100644 index 0000000..d2a4ede --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream; + +import java.util.*; + +/** + * Stream tuple extractor to convert messages to Ignite key-value tuples. + */ +public interface StreamTupleExtractor { + /** + * Extracts a key-value tuple from a message. + * + * @param msg Message. + * @return Key-value tuple. + */ + public Map.Entry extract(T msg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java deleted file mode 100644 index b99521a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.stream.adapters; - -import org.apache.ignite.*; - -import java.util.*; - -/** - * Convenience adapter for streamers. Adapters are optional components for - * streaming from different data sources. The purpose of adapters is to - * convert different message formats into Ignite stream key-value tuples - * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}. - */ -public abstract class StreamAdapter { - /** Tuple extractor. */ - private StreamTupleExtractor extractor; - - /** Streamer. */ - private IgniteDataStreamer stmr; - - /** Ignite. */ - private Ignite ignite; - - /** - * Empty constructor. - */ - protected StreamAdapter() { - // No-op. - } - - /** - * Stream adapter. - * - * @param stmr Streamer. - * @param extractor Tuple extractor. - */ - protected StreamAdapter(IgniteDataStreamer stmr, StreamTupleExtractor extractor) { - this.stmr = stmr; - this.extractor = extractor; - } - - /** - * @return Provided data streamer. - */ - public IgniteDataStreamer getStreamer() { - return stmr; - } - - /** - * @param stmr Ignite data streamer. - */ - public void setStreamer(IgniteDataStreamer stmr) { - this.stmr = stmr; - } - - /** - * @return Provided tuple extractor. - */ - public StreamTupleExtractor getTupleExtractor() { - return extractor; - } - - /** - * @param extractor Extractor for key-value tuples from messages. - */ - public void setTupleExtractor(StreamTupleExtractor extractor) { - this.extractor = extractor; - } - - /** - * @return Provided {@link Ignite} instance. - */ - public Ignite getIgnite() { - return ignite; - } - - /** - * @param ignite {@link Ignite} instance. - */ - public void setIgnite(Ignite ignite) { - this.ignite = ignite; - } - - /** - * Converts given message to a tuple and adds it to the underlying streamer. - * - * @param msg Message to convert. - */ - protected void addMessage(T msg) { - Map.Entry e = extractor.extract(msg); - - if (e != null) - stmr.addData(e); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java deleted file mode 100644 index 9b0c395..0000000 --- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.stream.adapters; - -import java.util.*; - -/** - * Stream tuple extractor to convert messages to Ignite key-value tuples. - */ -public interface StreamTupleExtractor { - /** - * Extracts a key-value tuple from a message. - * - * @param msg Message. - * @return Key-value tuple. - */ - public Map.Entry extract(T msg); -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java deleted file mode 100644 index a69ffc0..0000000 --- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Contains Ignite stream adapters. - */ -package org.apache.ignite.stream.adapters; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java deleted file mode 100644 index 66369ea..0000000 --- a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java +++ /dev/null @@ -1,217 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.stream.socket; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.nio.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.stream.adapters.*; -import org.jetbrains.annotations.*; - -import java.net.*; -import java.nio.*; - -/** - * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and - * streams into {@link IgniteDataStreamer} instance. - *

- * By default server uses size-based message processing. That is every message sent over the socket is prepended with - * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then - * delimiter-based message processing will be used. That is every message sent over the socket is appended with - * provided delimiter. - *

- * Received messages through socket converts to Java object using standard serialization. Conversion functionality - * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from - * non Java clients). - */ -public class IgniteSocketStreamer extends StreamAdapter { - /** Default threads. */ - private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors(); - - /** Logger. */ - private IgniteLogger log; - - /** Address. */ - private InetAddress addr; - - /** Server port. */ - private int port; - - /** Threads number. */ - private int threads = DFLT_THREADS; - - /** Direct mode. */ - private boolean directMode; - - /** Delimiter. */ - private byte[] delim; - - /** Converter. */ - private SocketMessageConverter converter; - - /** Server. */ - private GridNioServer srv; - - /** - * Sets server address. - * - * @param addr Address. - */ - public void setAddr(InetAddress addr) { - this.addr = addr; - } - - /** - * Sets port number. - * - * @param port Port. - */ - public void setPort(int port) { - this.port = port; - } - - /** - * Sets threadds amount. - * - * @param threads Threads. - */ - public void setThreads(int threads) { - this.threads = threads; - } - - /** - * Sets direct mode flag. - * - * @param directMode Direct mode. - */ - public void setDirectMode(boolean directMode) { - this.directMode = directMode; - } - - /** - * Sets message delimiter. - * - * @param delim Delimiter. - */ - public void setDelimiter(byte[] delim) { - this.delim = delim; - } - - /** - * Sets message converter. - * - * @param converter Converter. - */ - public void setConverter(SocketMessageConverter converter) { - this.converter = converter; - } - - /** - * Starts streamer. - * - * @throws IgniteException If failed. - */ - public void start() { - A.notNull(getTupleExtractor(), "tupleExtractor"); - A.notNull(getStreamer(), "streamer"); - A.notNull(getIgnite(), "ignite"); - A.ensure(threads > 0, "threads > 0"); - - log = getIgnite().log(); - - GridNioServerListener lsnr = new GridNioServerListenerAdapter() { - @Override public void onConnected(GridNioSession ses) { - assert ses.accepted(); - - if (log.isDebugEnabled()) - log.debug("Accepted connection: " + ses.remoteAddress()); - } - - @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { - if (e != null) - log.error("Connection failed with exception", e); - } - - @Override public void onMessage(GridNioSession ses, byte[] msg) { - addMessage(converter.convert(msg)); - } - }; - - ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; - - GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) : - new GridDelimitedParser(delim, directMode); - - if (converter == null) - converter = new DefaultConverter<>(); - - GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode); - - GridNioFilter[] filters = new GridNioFilter[] {codec}; - - try { - srv = new GridNioServer.Builder() - .address(addr == null ? InetAddress.getLocalHost() : addr) - .port(port) - .listener(lsnr) - .logger(log) - .selectorCount(threads) - .byteOrder(byteOrder) - .filters(filters) - .build(); - } - catch (IgniteCheckedException | UnknownHostException e) { - throw new IgniteException(e); - } - - srv.start(); - - if (log.isDebugEnabled()) - log.debug("Socket streaming server started on " + addr + ':' + port); - } - - /** - * Stops streamer. - */ - public void stop() { - srv.stop(); - - if (log.isDebugEnabled()) - log.debug("Socket streaming server stopped"); - } - - /** - * Converts message to Java object using Jdk marshaller. - */ - private static class DefaultConverter implements SocketMessageConverter { - /** Marshaller. */ - private static final JdkMarshaller MARSH = new JdkMarshaller(); - - /** {@inheritDoc} */ - @Override public T convert(byte[] msg) { - try { - return MARSH.unmarshal(msg, null); - } - catch (IgniteCheckedException e) { - throw new IgniteException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java new file mode 100644 index 0000000..07ce77e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.stream.*; + +import org.jetbrains.annotations.*; + +import java.net.*; +import java.nio.*; + +/** + * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and + * streams into {@link IgniteDataStreamer} instance. + *

+ * By default server uses size-based message processing. That is every message sent over the socket is prepended with + * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then + * delimiter-based message processing will be used. That is every message sent over the socket is appended with + * provided delimiter. + *

+ * Received messages through socket converts to Java object using standard serialization. Conversion functionality + * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from + * non Java clients). + */ +public class SocketStreamer extends StreamAdapter { + /** Default threads. */ + private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors(); + + /** Logger. */ + private IgniteLogger log; + + /** Address. */ + private InetAddress addr; + + /** Server port. */ + private int port; + + /** Threads number. */ + private int threads = DFLT_THREADS; + + /** Direct mode. */ + private boolean directMode; + + /** Delimiter. */ + private byte[] delim; + + /** Converter. */ + private SocketMessageConverter converter; + + /** Server. */ + private GridNioServer srv; + + /** + * Sets server address. + * + * @param addr Address. + */ + public void setAddr(InetAddress addr) { + this.addr = addr; + } + + /** + * Sets port number. + * + * @param port Port. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Sets threadds amount. + * + * @param threads Threads. + */ + public void setThreads(int threads) { + this.threads = threads; + } + + /** + * Sets direct mode flag. + * + * @param directMode Direct mode. + */ + public void setDirectMode(boolean directMode) { + this.directMode = directMode; + } + + /** + * Sets message delimiter. + * + * @param delim Delimiter. + */ + public void setDelimiter(byte[] delim) { + this.delim = delim; + } + + /** + * Sets message converter. + * + * @param converter Converter. + */ + public void setConverter(SocketMessageConverter converter) { + this.converter = converter; + } + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + A.notNull(getTupleExtractor(), "tupleExtractor"); + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + A.ensure(threads > 0, "threads > 0"); + + log = getIgnite().log(); + + GridNioServerListener lsnr = new GridNioServerListenerAdapter() { + @Override public void onConnected(GridNioSession ses) { + assert ses.accepted(); + + if (log.isDebugEnabled()) + log.debug("Accepted connection: " + ses.remoteAddress()); + } + + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (e != null) + log.error("Connection failed with exception", e); + } + + @Override public void onMessage(GridNioSession ses, byte[] msg) { + addMessage(converter.convert(msg)); + } + }; + + ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + + GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) : + new GridDelimitedParser(delim, directMode); + + if (converter == null) + converter = new DefaultConverter<>(); + + GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode); + + GridNioFilter[] filters = new GridNioFilter[] {codec}; + + try { + srv = new GridNioServer.Builder() + .address(addr == null ? InetAddress.getLocalHost() : addr) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(threads) + .byteOrder(byteOrder) + .filters(filters) + .build(); + } + catch (IgniteCheckedException | UnknownHostException e) { + throw new IgniteException(e); + } + + srv.start(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server started on " + addr + ':' + port); + } + + /** + * Stops streamer. + */ + public void stop() { + srv.stop(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server stopped"); + } + + /** + * Converts message to Java object using Jdk marshaller. + */ + private static class DefaultConverter implements SocketMessageConverter { + /** Marshaller. */ + private static final JdkMarshaller MARSH = new JdkMarshaller(); + + /** {@inheritDoc} */ + @Override public T convert(byte[] msg) { + try { + return MARSH.unmarshal(msg, null); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java deleted file mode 100644 index 19852ce..0000000 --- a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java +++ /dev/null @@ -1,315 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.stream.socket; - -import org.apache.ignite.*; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.events.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.marshaller.*; -import org.apache.ignite.marshaller.jdk.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.stream.adapters.*; -import org.apache.ignite.testframework.junits.common.*; - -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.events.EventType.*; - -/** - * Tests {@link IgniteSocketStreamer}. - */ -public class IgniteSocketStreamerSelfTest extends GridCommonAbstractTest { - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Grid count. */ - private final static int GRID_CNT = 3; - - /** Count. */ - private static final int CNT = 500; - - /** Delimiter. */ - private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0}; - - /** Port. */ - private static int port; - - /** Ignite. */ - private static Ignite ignite; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration() throws Exception { - IgniteConfiguration cfg = super.getConfiguration(); - - CacheConfiguration ccfg = cacheConfiguration(cfg, null); - - cfg.setCacheConfiguration(ccfg); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - return cfg; - } - - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - ignite = startGrids(GRID_CNT); - ignite.getOrCreateCache(defaultCacheConfiguration()); - - try (ServerSocket sock = new ServerSocket(0)) { - port = sock.getLocalPort(); - } - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - stopAllGrids(); - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - ignite.cache(null).clear(); - } - - /** - * @throws Exception If failed. - */ - public void testSizeBasedDefaultConverter() throws Exception { - test(null, null, new Runnable() { - @Override public void run() { - try (Socket sock = new Socket(InetAddress.getLocalHost(), port); - OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { - Marshaller marsh = new JdkMarshaller(); - - for (int i = 0; i < CNT; i++) { - byte[] msg = marsh.marshal(new Tuple(i)); - - os.write(msg.length >>> 24); - os.write(msg.length >>> 16); - os.write(msg.length >>> 8); - os.write(msg.length); - - os.write(msg); - } - } - catch (IOException | IgniteCheckedException e) { - throw new IgniteException(e); - } - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testSizeBasedCustomConverter() throws Exception { - SocketMessageConverter converter = new SocketMessageConverter() { - @Override public Tuple convert(byte[] msg) { - int i = (msg[0] & 0xFF) << 24; - i |= (msg[1] & 0xFF) << 16; - i |= (msg[2] & 0xFF) << 8; - i |= msg[3] & 0xFF; - - return new Tuple(i); - } - }; - - test(converter, null, new Runnable() { - @Override public void run() { - try(Socket sock = new Socket(InetAddress.getLocalHost(), port); - OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { - - for (int i = 0; i < CNT; i++) { - os.write(0); - os.write(0); - os.write(0); - os.write(4); - - os.write(i >>> 24); - os.write(i >>> 16); - os.write(i >>> 8); - os.write(i); - } - } - catch (IOException e) { - throw new IgniteException(e); - } - } - }); - } - - /** - * @throws Exception If failed. - */ - public void testDelimiterBasedDefaultConverter() throws Exception { - test(null, DELIM, new Runnable() { - @Override public void run() { - try(Socket sock = new Socket(InetAddress.getLocalHost(), port); - OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { - Marshaller marsh = new JdkMarshaller(); - - for (int i = 0; i < CNT; i++) { - byte[] msg = marsh.marshal(new Tuple(i)); - - os.write(msg); - os.write(DELIM); - } - } - catch (IOException | IgniteCheckedException e) { - throw new IgniteException(e); - } - } - }); - - } - - /** - * @throws Exception If failed. - */ - public void testDelimiterBasedCustomConverter() throws Exception { - SocketMessageConverter converter = new SocketMessageConverter() { - @Override public Tuple convert(byte[] msg) { - int i = (msg[0] & 0xFF) << 24; - i |= (msg[1] & 0xFF) << 16; - i |= (msg[2] & 0xFF) << 8; - i |= msg[3] & 0xFF; - - return new Tuple(i); - } - }; - - test(converter, DELIM, new Runnable() { - @Override public void run() { - try(Socket sock = new Socket(InetAddress.getLocalHost(), port); - OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { - - for (int i = 0; i < CNT; i++) { - os.write(i >>> 24); - os.write(i >>> 16); - os.write(i >>> 8); - os.write(i); - - os.write(DELIM); - } - } - catch (IOException e) { - throw new IgniteException(e); - } - } - }); - } - - /** - * @param converter Converter. - * @param r Runnable.. - */ - private void test(@Nullable SocketMessageConverter converter, @Nullable byte[] delim, Runnable r) throws Exception - { - IgniteSocketStreamer sockStmr = null; - - try (IgniteDataStreamer stmr = ignite.dataStreamer(null)) { - - stmr.allowOverwrite(true); - stmr.autoFlushFrequency(10); - - sockStmr = new IgniteSocketStreamer<>(); - - IgniteCache cache = ignite.cache(null); - - sockStmr.setIgnite(ignite); - - sockStmr.setStreamer(stmr); - - sockStmr.setPort(port); - - sockStmr.setDelimiter(delim); - - sockStmr.setTupleExtractor(new StreamTupleExtractor() { - @Override public Map.Entry extract(Tuple msg) { - return new IgniteBiTuple<>(msg.key, msg.val); - } - }); - - if (converter != null) - sockStmr.setConverter(converter); - - final CountDownLatch latch = new CountDownLatch(CNT); - - IgniteBiPredicate locLsnr = new IgniteBiPredicate() { - @Override public boolean apply(UUID uuid, CacheEvent evt) { - latch.countDown(); - - return true; - } - }; - - ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT); - - sockStmr.start(); - - r.run(); - - latch.await(); - - assertEquals(CNT, cache.size(CachePeekMode.PRIMARY)); - - for (int i = 0; i < CNT; i++) - assertEquals(Integer.toString(i), cache.get(i)); - } - finally { - if (sockStmr != null) - sockStmr.stop(); - } - - } - - /** - * Tuple. - */ - private static class Tuple implements Serializable { - /** Serial version uid. */ - private static final long serialVersionUID = 0L; - - /** Key. */ - private final int key; - - /** Value. */ - private final String val; - - /** - * @param key Key. - */ - Tuple(int key) { - this.key = key; - this.val = Integer.toString(key); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java new file mode 100644 index 0000000..752e43c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.testframework.junits.common.*; + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * Tests {@link SocketStreamer}. + */ +public class SocketStreamerSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Grid count. */ + private final static int GRID_CNT = 3; + + /** Count. */ + private static final int CNT = 500; + + /** Delimiter. */ + private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0}; + + /** Port. */ + private static int port; + + /** Ignite. */ + private static Ignite ignite; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + CacheConfiguration ccfg = cacheConfiguration(cfg, null); + + cfg.setCacheConfiguration(ccfg); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = startGrids(GRID_CNT); + ignite.getOrCreateCache(defaultCacheConfiguration()); + + try (ServerSocket sock = new ServerSocket(0)) { + port = sock.getLocalPort(); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ignite.cache(null).clear(); + } + + /** + * @throws Exception If failed. + */ + public void testSizeBasedDefaultConverter() throws Exception { + test(null, null, new Runnable() { + @Override public void run() { + try (Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + Marshaller marsh = new JdkMarshaller(); + + for (int i = 0; i < CNT; i++) { + byte[] msg = marsh.marshal(new Tuple(i)); + + os.write(msg.length >>> 24); + os.write(msg.length >>> 16); + os.write(msg.length >>> 8); + os.write(msg.length); + + os.write(msg); + } + } + catch (IOException | IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testSizeBasedCustomConverter() throws Exception { + SocketMessageConverter converter = new SocketMessageConverter() { + @Override public Tuple convert(byte[] msg) { + int i = (msg[0] & 0xFF) << 24; + i |= (msg[1] & 0xFF) << 16; + i |= (msg[2] & 0xFF) << 8; + i |= msg[3] & 0xFF; + + return new Tuple(i); + } + }; + + test(converter, null, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + + for (int i = 0; i < CNT; i++) { + os.write(0); + os.write(0); + os.write(0); + os.write(4); + + os.write(i >>> 24); + os.write(i >>> 16); + os.write(i >>> 8); + os.write(i); + } + } + catch (IOException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testDelimiterBasedDefaultConverter() throws Exception { + test(null, DELIM, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + Marshaller marsh = new JdkMarshaller(); + + for (int i = 0; i < CNT; i++) { + byte[] msg = marsh.marshal(new Tuple(i)); + + os.write(msg); + os.write(DELIM); + } + } + catch (IOException | IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + + } + + /** + * @throws Exception If failed. + */ + public void testDelimiterBasedCustomConverter() throws Exception { + SocketMessageConverter converter = new SocketMessageConverter() { + @Override public Tuple convert(byte[] msg) { + int i = (msg[0] & 0xFF) << 24; + i |= (msg[1] & 0xFF) << 16; + i |= (msg[2] & 0xFF) << 8; + i |= msg[3] & 0xFF; + + return new Tuple(i); + } + }; + + test(converter, DELIM, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + + for (int i = 0; i < CNT; i++) { + os.write(i >>> 24); + os.write(i >>> 16); + os.write(i >>> 8); + os.write(i); + + os.write(DELIM); + } + } + catch (IOException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @param converter Converter. + * @param r Runnable.. + */ + private void test(@Nullable SocketMessageConverter converter, @Nullable byte[] delim, Runnable r) throws Exception + { + SocketStreamer sockStmr = null; + + try (IgniteDataStreamer stmr = ignite.dataStreamer(null)) { + + stmr.allowOverwrite(true); + stmr.autoFlushFrequency(10); + + sockStmr = new SocketStreamer<>(); + + IgniteCache cache = ignite.cache(null); + + sockStmr.setIgnite(ignite); + + sockStmr.setStreamer(stmr); + + sockStmr.setPort(port); + + sockStmr.setDelimiter(delim); + + sockStmr.setTupleExtractor(new StreamTupleExtractor() { + @Override public Map.Entry extract(Tuple msg) { + return new IgniteBiTuple<>(msg.key, msg.val); + } + }); + + if (converter != null) + sockStmr.setConverter(converter); + + final CountDownLatch latch = new CountDownLatch(CNT); + + IgniteBiPredicate locLsnr = new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT); + + sockStmr.start(); + + r.run(); + + latch.await(); + + assertEquals(CNT, cache.size(CachePeekMode.PRIMARY)); + + for (int i = 0; i < CNT; i++) + assertEquals(Integer.toString(i), cache.get(i)); + } + finally { + if (sockStmr != null) + sockStmr.stop(); + } + + } + + /** + * Tuple. + */ + private static class Tuple implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Key. */ + private final int key; + + /** Value. */ + private final String val; + + /** + * @param key Key. + */ + Tuple(int key) { + this.key = key; + this.val = Integer.toString(key); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java index 87bbfbb..61be976 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java @@ -32,7 +32,7 @@ public class IgniteStreamTestSuite extends TestSuite { public static TestSuite suite() throws Exception { TestSuite suite = new TestSuite("Ignite Stream Test Suite"); - suite.addTest(new TestSuite(IgniteSocketStreamerSelfTest.class)); + suite.addTest(new TestSuite(SocketStreamerSelfTest.class)); return suite; }