Return-Path: X-Original-To: apmail-ignite-commits-archive@minotaur.apache.org Delivered-To: apmail-ignite-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D15AB1087A for ; Tue, 19 May 2015 18:23:43 +0000 (UTC) Received: (qmail 50587 invoked by uid 500); 19 May 2015 18:23:43 -0000 Delivered-To: apmail-ignite-commits-archive@ignite.apache.org Received: (qmail 50557 invoked by uid 500); 19 May 2015 18:23:43 -0000 Mailing-List: contact commits-help@ignite.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ignite.incubator.apache.org Delivered-To: mailing list commits@ignite.incubator.apache.org Received: (qmail 50505 invoked by uid 99); 19 May 2015 18:23:43 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 May 2015 18:23:43 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 2DF5E182929 for ; Tue, 19 May 2015 18:23:43 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.791 X-Spam-Level: X-Spam-Status: No, score=0.791 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, T_RP_MATCHES_RCVD=-0.01, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id l_CjizlgG79z for ; Tue, 19 May 2015 18:23:28 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with SMTP id 869DF254BC for ; Tue, 19 May 2015 18:23:17 +0000 (UTC) Received: (qmail 48535 invoked by uid 99); 19 May 2015 18:23:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 May 2015 18:23:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6F387E35B7; Tue, 19 May 2015 18:23:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: agoncharuk@apache.org To: commits@ignite.incubator.apache.org Date: Tue, 19 May 2015 18:23:27 -0000 Message-Id: <4b36a2d919a8402083eea9bf0cb545e9@git.apache.org> In-Reply-To: <925a694ef3e740b6b6ff97d628ea6b11@git.apache.org> References: <925a694ef3e740b6b6ff97d628ea6b11@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [12/40] incubator-ignite git commit: ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket. ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/53995dcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/53995dcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/53995dcb Branch: refs/heads/ignite-889 Commit: 53995dcb3470de07df73d8dfd284da0dbb8df8dd Parents: b0fbfa0 Author: agura Authored: Mon Apr 13 18:28:40 2015 +0300 Committer: agura Committed: Fri May 15 03:44:27 2015 +0300 ---------------------------------------------------------------------- .../streaming/socket/SocketStreamerExample.java | 158 ++++++++++ .../socket/ZStringsSocketStreamerExample.java | 151 +++++++++ .../examples/streaming/socket/package-info.java | 21 ++ .../internal/util/nio/GridBufferedParser.java | 4 - .../internal/util/nio/GridDelimitedParser.java | 91 ++++++ .../util/nio/GridNioDelimitedBuffer.java | 106 +++++++ .../ignite/stream/adapters/StreamAdapter.java | 17 + .../stream/socket/IgniteSocketStreamer.java | 217 +++++++++++++ .../stream/socket/SocketMessageConverter.java | 31 ++ .../ignite/stream/socket/package-info.java | 21 ++ .../util/nio/GridNioDelimitedBufferTest.java | 112 +++++++ .../socket/IgniteSocketStreamerSelfTest.java | 315 +++++++++++++++++++ .../ignite/stream/socket/package-info.java | 21 ++ .../testsuites/IgniteStreamTestSuite.java | 39 +++ .../testsuites/IgniteUtilSelfTestSuite.java | 1 + 15 files changed, 1301 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java new file mode 100644 index 0000000..73cb970 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.socket; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.numbers.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.stream.adapters.*; +import org.apache.ignite.stream.socket.*; + +import javax.cache.processor.*; +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Streams random numbers into the streaming cache using {@link IgniteSocketStreamer}. + * To start the example, you should: + *
    + *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • + *
  • Start streaming using {@link SocketStreamerExample}.
  • + *
  • Start querying popular numbers using {@link QueryPopularNumbers}.
  • + *
+ *

+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class SocketStreamerExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + /** Port. */ + private static final int PORT = 5555; + + /** + * @param args Args. + */ + public static void main(String[] args) throws InterruptedException, IOException { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); + + try (IgniteDataStreamer stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer() { + @Override public Object process(MutableEntry e, Object... objects) + throws EntryProcessorException { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + + InetAddress addr = InetAddress.getLocalHost(); + + IgniteSocketStreamer sockStmr = new IgniteSocketStreamer<>(); + + sockStmr.setAddr(addr); + + sockStmr.setPort(PORT); + + sockStmr.setIgnite(ignite); + + sockStmr.setStreamer(stmr); + + sockStmr.setTupleExtractor(new StreamTupleExtractor() { + @Override public Map.Entry extract(Tuple tuple) { + return new IgniteBiTuple<>(tuple.key, tuple.cnt); + } + }); + + sockStmr.start(); + + sendData(addr, PORT); + } + } + } + + /** + * @param addr Address. + * @param port Port. + */ + private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException { + try (Socket sock = new Socket(addr, port); + OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) { + while (true) { + try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream out = new ObjectOutputStream(bos)) { + Tuple tuple = new Tuple(RAND.nextInt(RANGE), 1L); + + out.writeObject(tuple); + + byte[] arr = bos.toByteArray(); + + oos.write(arr.length >>> 24); + oos.write(arr.length >>> 16); + oos.write(arr.length >>> 8); + oos.write(arr.length); + + oos.write(arr); + } + } + } + } + + /** + * Tuple. + */ + private static class Tuple implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0; + + /** Key. */ + private final int key; + + /** Count. */ + private final long cnt; + + /** + * @param key Key. + * @param cnt Count. + */ + public Tuple(int key, long cnt) { + this.key = key; + this.cnt = cnt; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java new file mode 100644 index 0000000..a535c73 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.examples.streaming.socket; + +import org.apache.ignite.*; +import org.apache.ignite.examples.*; +import org.apache.ignite.examples.streaming.numbers.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.stream.*; +import org.apache.ignite.stream.adapters.*; +import org.apache.ignite.stream.socket.*; + +import javax.cache.processor.*; +import java.io.*; +import java.net.*; +import java.util.*; + +/** + * Stream random numbers into the streaming cache using {@link IgniteSocketStreamer}. + *

+ * Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams + * zero-terminated strings. + *

+ * To start the example, you should: + *

    + *
  • Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.
  • + *
  • Start streaming using {@link ZStringsSocketStreamerExample}.
  • + *
  • Start querying popular numbers using {@link QueryPopularNumbers}.
  • + *
+ *

+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM. + */ +public class ZStringsSocketStreamerExample { + /** Random number generator. */ + private static final Random RAND = new Random(); + + /** Range within which to generate numbers. */ + private static final int RANGE = 1000; + + /** Port. */ + private static final int PORT = 5555; + + /** Delimiter. */ + private static final byte[] DELIM = new byte[] {0}; + + /** + * @param args Args. + */ + public static void main(String[] args) throws InterruptedException, IOException { + // Mark this cluster member as client. + Ignition.setClientMode(true); + + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + if (!ExamplesUtils.hasServerNodes(ignite)) + return; + + // The cache is configured with sliding window holding 1 second of the streaming data. + IgniteCache stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache()); + + try (IgniteDataStreamer stmr = ignite.dataStreamer(stmCache.getName())) { + // Allow data updates. + stmr.allowOverwrite(true); + + // Configure data transformation to count instances of the same word. + stmr.receiver(new StreamTransformer() { + @Override public Object process(MutableEntry e, Object... objects) + throws EntryProcessorException { + Long val = e.getValue(); + + e.setValue(val == null ? 1L : val + 1); + + return null; + } + }); + + InetAddress addr = InetAddress.getLocalHost(); + + IgniteSocketStreamer sockStmr = new IgniteSocketStreamer<>(); + + sockStmr.setAddr(addr); + + sockStmr.setPort(PORT); + + sockStmr.setDelimiter(DELIM); + + sockStmr.setIgnite(ignite); + + sockStmr.setStreamer(stmr); + + // Converter from zero-terminated string to Java strings. + sockStmr.setConverter(new SocketMessageConverter() { + @Override public String convert(byte[] msg) { + try { + return new String(msg, "ASCII"); + } + catch (UnsupportedEncodingException e) { + throw new IgniteException(e); + } + } + }); + + sockStmr.setTupleExtractor(new StreamTupleExtractor() { + @Override public Map.Entry extract(String input) { + String[] pair = input.split("="); + return new IgniteBiTuple<>(Integer.parseInt(pair[0]), Long.parseLong(pair[1])); + } + }); + + sockStmr.start(); + + sendData(addr, PORT); + } + } + } + + /** + * @param addr Address. + * @param port Port. + */ + private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException { + try (Socket sock = new Socket(addr, port); + OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) { + + while (true) { + int key = RAND.nextInt(RANGE); + + String str = key + "=1"; + + byte[] arr = str.getBytes("ASCII"); + + oos.write(arr); + oos.write(DELIM); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java new file mode 100644 index 0000000..ae7bdf9 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples. + */ +package org.apache.ignite.examples.streaming.socket; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java index 3f81dc4..a03d2c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.util.nio; import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.internal.*; import java.io.*; import java.nio.*; @@ -33,9 +32,6 @@ import java.nio.*; * | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE | * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ * - *

- * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If beginning of a stream, - * isn't equal to these bytes than exception will be thrown. */ public class GridBufferedParser implements GridNioParser { /** Buffer metadata key. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java new file mode 100644 index 0000000..256597c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.apache.ignite.*; + +import java.io.*; +import java.nio.*; + +/** + * This class implements stream parser based on {@link GridNioDelimitedBuffer}. + *

+ * The rule for this parser is that every message sent over the stream is appended with + * delimiter (bytes array). So, the stream structure is as follows: + *

+ *     +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ *     |   MESSAGE  | DELIMITER  |  MESSAGE  | DELIMITER  |
+ *     +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ * 
+ */ +public class GridDelimitedParser implements GridNioParser { + /** Buffer metadata key. */ + private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Delimiter. */ + private final byte[] delim; + + /** Direct buffer. */ + private final boolean directBuf; + + /** + * @param delim Delimiter. + * @param directBuf Direct buffer. + */ + public GridDelimitedParser(byte[] delim, boolean directBuf) { + this.delim = delim; + this.directBuf = directBuf; + } + + /** {@inheritDoc} */ + @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { + GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY); + + // Decode for a given session is called per one thread, so there should not be any concurrency issues. + // However, we make some additional checks. + if (nioBuf == null) { + nioBuf = new GridNioDelimitedBuffer(delim); + + GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf); + + assert old == null; + } + + return nioBuf.read(buf); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { + byte[] msg0 = (byte[])msg; + + int cap = msg0.length + delim.length; + ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(cap) : ByteBuffer.allocate(cap); + + res.put(msg0); + res.put(delim); + + res.flip(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return this.getClass().getSimpleName(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java new file mode 100644 index 0000000..2b764ec --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import org.jetbrains.annotations.*; + +import java.nio.*; +import java.util.*; + +/** + * Buffer with message delimiter support. + */ +public class GridNioDelimitedBuffer { + /** Delimiter. */ + private final byte[] delim; + + /** Data. */ + private byte[] data = new byte[16384]; + + /** Count. */ + private int cnt; + + /** Index. */ + private int idx; + + /** + * @param delim Delimiter. + */ + public GridNioDelimitedBuffer(byte[] delim) { + assert delim != null; + assert delim.length > 0; + + this.delim = delim; + + reset(); + } + + /** + * Resets buffer state. + */ + private void reset() { + cnt = 0; + idx = 0; + } + + /** + * @param buf Buffer. + * @return Message bytes or {@code null} if message is not fully read yet. + */ + @Nullable public byte[] read(ByteBuffer buf) { + while(buf.hasRemaining()) { + if (cnt == data.length) + data = Arrays.copyOf(data, data.length * 2); + + byte b = buf.get(); + + data[cnt++] = b; + + if (b == delim[idx]) + idx++; + else if (idx > 0) { + int pos = cnt - idx; + + idx = 0; + + for (int i = pos; i < cnt; i++) { + if (data[pos] == delim[idx]) { + pos++; + + idx++; + } + else { + pos = cnt - idx; + + idx = 0; + } + } + } + + if (idx == delim.length) { + byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length); + + reset(); + + return bytes; + } + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java index c729362..b99521a 100644 --- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java @@ -34,6 +34,9 @@ public abstract class StreamAdapter { /** Streamer. */ private IgniteDataStreamer stmr; + /** Ignite. */ + private Ignite ignite; + /** * Empty constructor. */ @@ -81,6 +84,20 @@ public abstract class StreamAdapter { } /** + * @return Provided {@link Ignite} instance. + */ + public Ignite getIgnite() { + return ignite; + } + + /** + * @param ignite {@link Ignite} instance. + */ + public void setIgnite(Ignite ignite) { + this.ignite = ignite; + } + + /** * Converts given message to a tuple and adds it to the underlying streamer. * * @param msg Message to convert. http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java new file mode 100644 index 0000000..66369ea --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.nio.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.stream.adapters.*; +import org.jetbrains.annotations.*; + +import java.net.*; +import java.nio.*; + +/** + * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and + * streams into {@link IgniteDataStreamer} instance. + *

+ * By default server uses size-based message processing. That is every message sent over the socket is prepended with + * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then + * delimiter-based message processing will be used. That is every message sent over the socket is appended with + * provided delimiter. + *

+ * Received messages through socket converts to Java object using standard serialization. Conversion functionality + * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from + * non Java clients). + */ +public class IgniteSocketStreamer extends StreamAdapter { + /** Default threads. */ + private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors(); + + /** Logger. */ + private IgniteLogger log; + + /** Address. */ + private InetAddress addr; + + /** Server port. */ + private int port; + + /** Threads number. */ + private int threads = DFLT_THREADS; + + /** Direct mode. */ + private boolean directMode; + + /** Delimiter. */ + private byte[] delim; + + /** Converter. */ + private SocketMessageConverter converter; + + /** Server. */ + private GridNioServer srv; + + /** + * Sets server address. + * + * @param addr Address. + */ + public void setAddr(InetAddress addr) { + this.addr = addr; + } + + /** + * Sets port number. + * + * @param port Port. + */ + public void setPort(int port) { + this.port = port; + } + + /** + * Sets threadds amount. + * + * @param threads Threads. + */ + public void setThreads(int threads) { + this.threads = threads; + } + + /** + * Sets direct mode flag. + * + * @param directMode Direct mode. + */ + public void setDirectMode(boolean directMode) { + this.directMode = directMode; + } + + /** + * Sets message delimiter. + * + * @param delim Delimiter. + */ + public void setDelimiter(byte[] delim) { + this.delim = delim; + } + + /** + * Sets message converter. + * + * @param converter Converter. + */ + public void setConverter(SocketMessageConverter converter) { + this.converter = converter; + } + + /** + * Starts streamer. + * + * @throws IgniteException If failed. + */ + public void start() { + A.notNull(getTupleExtractor(), "tupleExtractor"); + A.notNull(getStreamer(), "streamer"); + A.notNull(getIgnite(), "ignite"); + A.ensure(threads > 0, "threads > 0"); + + log = getIgnite().log(); + + GridNioServerListener lsnr = new GridNioServerListenerAdapter() { + @Override public void onConnected(GridNioSession ses) { + assert ses.accepted(); + + if (log.isDebugEnabled()) + log.debug("Accepted connection: " + ses.remoteAddress()); + } + + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + if (e != null) + log.error("Connection failed with exception", e); + } + + @Override public void onMessage(GridNioSession ses, byte[] msg) { + addMessage(converter.convert(msg)); + } + }; + + ByteOrder byteOrder = ByteOrder.BIG_ENDIAN; + + GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) : + new GridDelimitedParser(delim, directMode); + + if (converter == null) + converter = new DefaultConverter<>(); + + GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode); + + GridNioFilter[] filters = new GridNioFilter[] {codec}; + + try { + srv = new GridNioServer.Builder() + .address(addr == null ? InetAddress.getLocalHost() : addr) + .port(port) + .listener(lsnr) + .logger(log) + .selectorCount(threads) + .byteOrder(byteOrder) + .filters(filters) + .build(); + } + catch (IgniteCheckedException | UnknownHostException e) { + throw new IgniteException(e); + } + + srv.start(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server started on " + addr + ':' + port); + } + + /** + * Stops streamer. + */ + public void stop() { + srv.stop(); + + if (log.isDebugEnabled()) + log.debug("Socket streaming server stopped"); + } + + /** + * Converts message to Java object using Jdk marshaller. + */ + private static class DefaultConverter implements SocketMessageConverter { + /** Marshaller. */ + private static final JdkMarshaller MARSH = new JdkMarshaller(); + + /** {@inheritDoc} */ + @Override public T convert(byte[] msg) { + try { + return MARSH.unmarshal(msg, null); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java new file mode 100644 index 0000000..8161d86 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +/** + * Socket message converter. + */ +public interface SocketMessageConverter { + /** + * Converter message represented by array of bytes to object. + * + * @param msg Message. + * @return Converted object. + */ + public T convert(byte[] msg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java new file mode 100644 index 0000000..e1cef65 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains socket streamer implementation. + */ +package org.apache.ignite.stream.socket; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java new file mode 100644 index 0000000..a0dd2e5 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import junit.framework.TestCase; + +import java.nio.*; +import java.util.*; + +/** + * Tests for {@link GridNioDelimitedBuffer}. + */ +public class GridNioDelimitedBufferTest extends TestCase { + /** */ + private static final String ASCII = "ASCII"; + + /** + * Tests simple delimiter (excluded from alphabet) + */ + public void testReadZString() throws Exception { + Random rnd = new Random(); + + int buffSize = 0; + + byte[] delim = new byte[] {0}; + + List strs = new ArrayList<>(50); + + for (int i = 0; i < 50; i++) { + int len = rnd.nextInt(128) + 1; + + buffSize += len + delim.length; + + StringBuilder sb = new StringBuilder(len); + + for (int j = 0; j < len; j++) + sb.append((char)(rnd.nextInt(26) + 'a')); + + + strs.add(sb.toString()); + } + + ByteBuffer buff = ByteBuffer.allocate(buffSize); + + for (String str : strs) { + buff.put(str.getBytes(ASCII)); + buff.put(delim); + } + + buff.flip(); + + byte[] msg; + + GridNioDelimitedBuffer delimBuff = new GridNioDelimitedBuffer(delim); + + List res = new ArrayList<>(strs.size()); + + while ((msg = delimBuff.read(buff)) != null) + res.add(new String(msg, ASCII)); + + assertEquals(strs, res); + } + + /** + * Tests compound delimiter (included to alphabet) + */ + public void testDelim() throws Exception { + byte[] delim = "aabb".getBytes(ASCII); + + List strs = Arrays.asList("za", "zaa", "zaab", "zab", "zaabaababbbbabaab"); + + int buffSize = 0; + + for (String str : strs) + buffSize += str.length() + delim.length; + + ByteBuffer buff = ByteBuffer.allocate(buffSize); + + for (String str : strs) { + buff.put(str.getBytes(ASCII)); + buff.put(delim); + } + + buff.flip(); + + byte[] msg; + + GridNioDelimitedBuffer delimBuff = new GridNioDelimitedBuffer(delim); + + List res = new ArrayList<>(strs.size()); + + while ((msg = delimBuff.read(buff)) != null) + res.add(new String(msg, ASCII)); + + assertEquals(strs, res); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java new file mode 100644 index 0000000..19852ce --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.stream.socket; + +import org.apache.ignite.*; +import org.apache.ignite.cache.*; +import org.apache.ignite.configuration.*; +import org.apache.ignite.events.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.marshaller.*; +import org.apache.ignite.marshaller.jdk.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; +import org.apache.ignite.stream.adapters.*; +import org.apache.ignite.testframework.junits.common.*; + +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.events.EventType.*; + +/** + * Tests {@link IgniteSocketStreamer}. + */ +public class IgniteSocketStreamerSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** Grid count. */ + private final static int GRID_CNT = 3; + + /** Count. */ + private static final int CNT = 500; + + /** Delimiter. */ + private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0}; + + /** Port. */ + private static int port; + + /** Ignite. */ + private static Ignite ignite; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + CacheConfiguration ccfg = cacheConfiguration(cfg, null); + + cfg.setCacheConfiguration(ccfg); + + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(discoSpi); + + return cfg; + } + + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + ignite = startGrids(GRID_CNT); + ignite.getOrCreateCache(defaultCacheConfiguration()); + + try (ServerSocket sock = new ServerSocket(0)) { + port = sock.getLocalPort(); + } + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ignite.cache(null).clear(); + } + + /** + * @throws Exception If failed. + */ + public void testSizeBasedDefaultConverter() throws Exception { + test(null, null, new Runnable() { + @Override public void run() { + try (Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + Marshaller marsh = new JdkMarshaller(); + + for (int i = 0; i < CNT; i++) { + byte[] msg = marsh.marshal(new Tuple(i)); + + os.write(msg.length >>> 24); + os.write(msg.length >>> 16); + os.write(msg.length >>> 8); + os.write(msg.length); + + os.write(msg); + } + } + catch (IOException | IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testSizeBasedCustomConverter() throws Exception { + SocketMessageConverter converter = new SocketMessageConverter() { + @Override public Tuple convert(byte[] msg) { + int i = (msg[0] & 0xFF) << 24; + i |= (msg[1] & 0xFF) << 16; + i |= (msg[2] & 0xFF) << 8; + i |= msg[3] & 0xFF; + + return new Tuple(i); + } + }; + + test(converter, null, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + + for (int i = 0; i < CNT; i++) { + os.write(0); + os.write(0); + os.write(0); + os.write(4); + + os.write(i >>> 24); + os.write(i >>> 16); + os.write(i >>> 8); + os.write(i); + } + } + catch (IOException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @throws Exception If failed. + */ + public void testDelimiterBasedDefaultConverter() throws Exception { + test(null, DELIM, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + Marshaller marsh = new JdkMarshaller(); + + for (int i = 0; i < CNT; i++) { + byte[] msg = marsh.marshal(new Tuple(i)); + + os.write(msg); + os.write(DELIM); + } + } + catch (IOException | IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }); + + } + + /** + * @throws Exception If failed. + */ + public void testDelimiterBasedCustomConverter() throws Exception { + SocketMessageConverter converter = new SocketMessageConverter() { + @Override public Tuple convert(byte[] msg) { + int i = (msg[0] & 0xFF) << 24; + i |= (msg[1] & 0xFF) << 16; + i |= (msg[2] & 0xFF) << 8; + i |= msg[3] & 0xFF; + + return new Tuple(i); + } + }; + + test(converter, DELIM, new Runnable() { + @Override public void run() { + try(Socket sock = new Socket(InetAddress.getLocalHost(), port); + OutputStream os = new BufferedOutputStream(sock.getOutputStream())) { + + for (int i = 0; i < CNT; i++) { + os.write(i >>> 24); + os.write(i >>> 16); + os.write(i >>> 8); + os.write(i); + + os.write(DELIM); + } + } + catch (IOException e) { + throw new IgniteException(e); + } + } + }); + } + + /** + * @param converter Converter. + * @param r Runnable.. + */ + private void test(@Nullable SocketMessageConverter converter, @Nullable byte[] delim, Runnable r) throws Exception + { + IgniteSocketStreamer sockStmr = null; + + try (IgniteDataStreamer stmr = ignite.dataStreamer(null)) { + + stmr.allowOverwrite(true); + stmr.autoFlushFrequency(10); + + sockStmr = new IgniteSocketStreamer<>(); + + IgniteCache cache = ignite.cache(null); + + sockStmr.setIgnite(ignite); + + sockStmr.setStreamer(stmr); + + sockStmr.setPort(port); + + sockStmr.setDelimiter(delim); + + sockStmr.setTupleExtractor(new StreamTupleExtractor() { + @Override public Map.Entry extract(Tuple msg) { + return new IgniteBiTuple<>(msg.key, msg.val); + } + }); + + if (converter != null) + sockStmr.setConverter(converter); + + final CountDownLatch latch = new CountDownLatch(CNT); + + IgniteBiPredicate locLsnr = new IgniteBiPredicate() { + @Override public boolean apply(UUID uuid, CacheEvent evt) { + latch.countDown(); + + return true; + } + }; + + ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT); + + sockStmr.start(); + + r.run(); + + latch.await(); + + assertEquals(CNT, cache.size(CachePeekMode.PRIMARY)); + + for (int i = 0; i < CNT; i++) + assertEquals(Integer.toString(i), cache.get(i)); + } + finally { + if (sockStmr != null) + sockStmr.stop(); + } + + } + + /** + * Tuple. + */ + private static class Tuple implements Serializable { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Key. */ + private final int key; + + /** Value. */ + private final String val; + + /** + * @param key Key. + */ + Tuple(int key) { + this.key = key; + this.val = Integer.toString(key); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java new file mode 100644 index 0000000..2e28469 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains tests for socket streamer. + */ +package org.apache.ignite.stream.socket; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java new file mode 100644 index 0000000..87bbfbb --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import org.apache.ignite.stream.socket.*; + +import junit.framework.*; + +/** + * Stream test suite. + */ +public class IgniteStreamTestSuite extends TestSuite { + /** + * @return Stream tests suite. + * @throws Exception If failed. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Ignite Stream Test Suite"); + + suite.addTest(new TestSuite(IgniteSocketStreamerSelfTest.class)); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java index 941b06e..32cd038 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java @@ -67,6 +67,7 @@ public class IgniteUtilSelfTestSuite extends TestSuite { suite.addTestSuite(GridNioSelfTest.class); suite.addTestSuite(GridNioFilterChainSelfTest.class); suite.addTestSuite(GridNioSslSelfTest.class); + suite.addTestSuite(GridNioDelimitedBufferTest.class); return suite; }