ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject incubator-ignite git commit: ignite-430 IgniteSocketStreamer to stream data from TCP socket.
Date Tue, 31 Mar 2015 17:50:11 GMT
Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-430 2077a88e4 -> 98d9511a4


ignite-430 IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/98d9511a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/98d9511a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/98d9511a

Branch: refs/heads/ignite-430
Commit: 98d9511a441c209609b3dcebf29dca24efcfa011
Parents: 2077a88
Author: Andrey Gura <agura@gridgain.com>
Authored: Tue Mar 31 20:49:30 2015 +0300
Committer: Andrey Gura <agura@gridgain.com>
Committed: Tue Mar 31 20:49:30 2015 +0300

----------------------------------------------------------------------
 .../streaming/SocketStreamerExample.java        |   2 +-
 .../streaming/TextSocketStreamerExample.java    |   2 +-
 .../stream/socket/IgniteSocketStreamer.java     |  98 +++++++++
 .../stream/socket/IgniteTextSocketStreamer.java |  83 ++++++++
 .../apache/ignite/stream/socket/Receiver.java   | 182 +++++++++++++++++
 .../ignite/stream/socket/package-info.java      |  21 ++
 .../ignite/streaming/IgniteSocketStreamer.java  |  98 ---------
 .../streaming/IgniteTextSocketStreamer.java     |  83 --------
 .../org/apache/ignite/streaming/Receiver.java   | 184 -----------------
 .../apache/ignite/streaming/package-info.java   |  21 --
 .../stream/socket/IgniteSocketStreamerTest.java | 135 ++++++++++++
 .../socket/IgniteTextSocketStreamerTest.java    | 123 +++++++++++
 .../ignite/stream/socket/ReceiverTest.java      | 202 ++++++++++++++++++
 .../streaming/IgniteSocketStreamerTest.java     | 134 ------------
 .../streaming/IgniteTextSocketStreamerTest.java | 122 -----------
 .../apache/ignite/streaming/ReceiverTest.java   | 203 -------------------
 pom.xml                                         |   4 +-
 17 files changed, 848 insertions(+), 849 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
index 5fea93e..7fdbea8 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/SocketStreamerExample.java
@@ -22,7 +22,7 @@ import org.apache.ignite.examples.*;
 import org.apache.ignite.examples.streaming.numbers.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.stream.*;
-import org.apache.ignite.streaming.*;
+import org.apache.ignite.stream.socket.*;
 
 import javax.cache.processor.*;
 import java.io.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
index 4677be1..93a70ae 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/TextSocketStreamerExample.java
@@ -22,7 +22,7 @@ import org.apache.ignite.examples.*;
 import org.apache.ignite.examples.streaming.numbers.*;
 import org.apache.ignite.lang.*;
 import org.apache.ignite.stream.*;
-import org.apache.ignite.streaming.*;
+import org.apache.ignite.stream.socket.*;
 
 import javax.cache.processor.*;
 import java.io.*;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
new file mode 100644
index 0000000..3540393
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Data streamer is responsible for streaming data from socket into cache. Every object obtained from socket converts
+ * to key-value pair using converter.
+ *
+ * @param <E> Type of element obtained from socket.
+ * @param <K> Cache entry key type.
+ * @param <V> Cache entry value type.
+ */
+public class IgniteSocketStreamer<E, K, V> extends Receiver<E, K, V> {
+    /** Host. */
+    private final String host;
+
+    /** Port. */
+    private final int port;
+
+    /**
+     * Constructs socket streamer.
+     *
+     * @param host Host.
+     * @param port Port.
+     * @param streamer Streamer.
+     * @param converter Stream to entry converter.
+     */
+    public IgniteSocketStreamer(
+        String host,
+        int port,
+        IgniteDataStreamer<K, V> streamer,
+        IgniteClosure<E, Map.Entry<K, V>> converter
+    ) {
+        super(streamer, converter);
+
+        A.notNull(host, "host is null");
+
+        this.host = host;
+        this.port = port;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void receive() {
+        try (Socket sock = new Socket(host, port)) {
+            receive(sock);
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /**
+     * Reads data from socket and adds them into target data stream.
+     *
+     * @param sock Socket.
+     */
+    @SuppressWarnings("unchecked")
+    private void receive(Socket sock) throws IOException {
+        try (ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()))) {
+            while (!isStopped()) {
+                try {
+                    E element = (E) ois.readObject();
+
+                    addData(element);
+                }
+                catch (EOFException e) {
+                    break;
+                }
+                catch (IOException | ClassNotFoundException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamer.java
new file mode 100644
index 0000000..0852317
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamer.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Data streamer is responsible for streaming data from socket into cache. Every object obtained from socket treats as
+ * {@link String} instance and converts to key-value pair using converter.
+ *
+ * @param <K> Cache entry key type.
+ * @param <V> Cache entry value type.
+ */
+public class IgniteTextSocketStreamer<K, V> extends Receiver<String, K, V> {
+    /** Host. */
+    private final String host;
+
+    /** Port. */
+    private final int port;
+
+    /**
+     * Constructs text socket streamer.
+     *
+     * @param host Host.
+     * @param port Port.
+     * @param streamer Streamer.
+     * @param converter Stream to entries converter.
+     */
+    public IgniteTextSocketStreamer(
+        String host,
+        int port,
+        IgniteDataStreamer<K, V> streamer,
+        IgniteClosure<String, Map.Entry<K, V>> converter
+    ) {
+        super(streamer, converter);
+
+        A.notNull(host, "host is null");
+
+        this.host = host;
+        this.port = port;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void receive() {
+        try (Socket sock = new Socket(host, port)) {
+            loadData(sock);
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    private void loadData(Socket sock) throws IOException {
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), "UTF-8"))) {
+            String val;
+
+            while (!isStopped() && (val = reader.readLine()) != null)
+                addData(val);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/stream/socket/Receiver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/Receiver.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/Receiver.java
new file mode 100644
index 0000000..72c24ad
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/Receiver.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+
+import java.util.*;
+
+/**
+ * Base implementation of data receiver.
+ *
+ * @param <E> Type of stream element.
+ * @param <K> Type of cache entry key.
+ * @param <V> Type of cache entry value.
+ */
+public abstract class Receiver<E, K, V> {
+    /** Object monitor. */
+    private final Object lock = new Object();
+
+    /** Worker. */
+    private Thread worker;
+
+    /** State. */
+    private volatile State state = State.INITIALIZED;
+
+    /** Target streamer. */
+    private final IgniteDataStreamer<K, V> streamer;
+
+    /** Element to entries transformer. */
+    private final IgniteClosure<E, Map.Entry<K, V>> converter;
+
+    /** Restart interval in milliseconds. */
+    private volatile long restartInterval = 2000;
+
+    /**
+     * Constructs stream receiver.
+     *
+     * @param streamer Streamer.
+     * @param converter Element to entries transformer.
+     */
+    public Receiver(IgniteDataStreamer<K, V> streamer, IgniteClosure<E, Map.Entry<K, V>> converter) {
+        A.notNull(streamer, "streamer is null");
+        A.notNull(converter, "converter is null");
+
+        this.streamer = streamer;
+        this.converter = converter;
+    }
+
+    /**
+     * Sets restart interval in milliseconds.
+     *
+     * @param interval Interval in milliseconds.
+     */
+    public void restartInterval(long interval) {
+        A.ensure(interval > 0, "interval > 0");
+
+        this.restartInterval = interval;
+    }
+
+    /**
+     * Starts receiver.
+     */
+    public void start() {
+        synchronized (lock) {
+            if (state != State.INITIALIZED)
+                throw new IllegalStateException("Receiver in " + state + " state can't be started.");
+
+            worker = new Thread(new ReceiverWorker());
+
+            worker.start();
+
+            state = State.STARTED;
+        }
+    }
+
+    /**
+     * Stops receiver.
+     */
+    public void stop() {
+        synchronized (lock) {
+            if (state != State.STARTED)
+                throw new IllegalStateException("Receiver in " + state + " state can't be stopped.");
+
+            state = State.STOPPED;
+        }
+
+        try {
+            worker.join();
+        }
+        catch (InterruptedException e) {
+            // No-op.
+        }
+    }
+
+    /**
+     * Checks whether receiver is started or not.
+     *
+     * @return {@code True} if receiver is started, {@code false} - otherwise.
+     */
+    public boolean isStarted() {
+        return state == State.STARTED;
+    }
+
+    /**
+     * Checks whether receiver is stopped or not.
+     *
+     * @return {@code True} if receiver is stopped, {@code false} - otherwise.
+     */
+    public boolean isStopped() {
+        return state == State.STOPPED;
+    }
+
+    /**
+     * Performs actual data receiving.
+     */
+    protected abstract void receive();
+
+    /**
+     * Convert stream data to cache entry and transfer it to the target streamer.
+     *
+     * @param element Element.
+     */
+    protected void addData(E element) {
+        streamer.addData(converter.apply(element));
+    }
+
+    /**
+     * Receiver state.
+     */
+    public enum State {
+        /** New. */
+        INITIALIZED,
+        /** Started. */
+        STARTED,
+        /** Stopped. */
+        STOPPED
+    }
+
+    /**
+     * Receiver worker that actually receives data from socket.
+     */
+    private class ReceiverWorker implements Runnable {
+        /** {@inheritDoc} */
+        @Override public void run() {
+            while (true) {
+                try {
+                    receive();
+                }
+                catch (Exception e) {
+                    // No-op.
+                }
+
+                if (isStopped())
+                    return;
+
+                try {
+                    Thread.sleep(restartInterval);
+                }
+                catch (InterruptedException e) {
+                    // No-op.
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
new file mode 100644
index 0000000..2644b33
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains Ignite socket streaming classes.
+ */
+package org.apache.ignite.stream.socket;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
deleted file mode 100644
index 72b6082..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteSocketStreamer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Data streamer is responsible for streaming data from socket into cache. Every object obtained from socket converts
- * to key-value pair using converter.
- *
- * @param <E> Type of element obtained from socket.
- * @param <K> Cache entry key type.
- * @param <V> Cache entry value type.
- */
-public class IgniteSocketStreamer<E, K, V> extends Receiver<E, K, V> {
-    /** Host. */
-    private final String host;
-
-    /** Port. */
-    private final int port;
-
-    /**
-     * Constructs socket streamer.
-     *
-     * @param host Host.
-     * @param port Port.
-     * @param streamer Streamer.
-     * @param converter Stream to entry converter.
-     */
-    public IgniteSocketStreamer(
-        String host,
-        int port,
-        IgniteDataStreamer<K, V> streamer,
-        IgniteClosure<E, Map.Entry<K, V>> converter
-    ) {
-        super(streamer, converter);
-
-        A.notNull(host, "host is null");
-
-        this.host = host;
-        this.port = port;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void receive() {
-        try (Socket sock = new Socket(host, port)) {
-            receive(sock);
-        }
-        catch (Exception e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /**
-     * Reads data from socket and adds them into target data stream.
-     *
-     * @param sock Socket.
-     */
-    @SuppressWarnings("unchecked")
-    private void receive(Socket sock) throws IOException {
-        try (ObjectInputStream ois = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()))) {
-            while (!isStopped()) {
-                try {
-                    E element = (E) ois.readObject();
-
-                    addData(element);
-                }
-                catch (EOFException e) {
-                    break;
-                }
-                catch (IOException | ClassNotFoundException e) {
-                    throw new IgniteException(e);
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
deleted file mode 100644
index 8094d37..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streaming/IgniteTextSocketStreamer.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Data streamer is responsible for streaming data from socket into cache. Every object obtained from socket treats as
- * {@link String} instance and converts to key-value pair using converter.
- *
- * @param <K> Cache entry key type.
- * @param <V> Cache entry value type.
- */
-public class IgniteTextSocketStreamer<K, V> extends Receiver<String, K, V> {
-    /** Host. */
-    private final String host;
-
-    /** Port. */
-    private final int port;
-
-    /**
-     * Constructs text socket streamer.
-     *
-     * @param host Host.
-     * @param port Port.
-     * @param streamer Streamer.
-     * @param converter Stream to entries converter.
-     */
-    public IgniteTextSocketStreamer(
-        String host,
-        int port,
-        IgniteDataStreamer<K, V> streamer,
-        IgniteClosure<String, Map.Entry<K, V>> converter
-    ) {
-        super(streamer, converter);
-
-        A.notNull(host, "host is null");
-
-        this.host = host;
-        this.port = port;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void receive() {
-        try (Socket sock = new Socket(host, port)) {
-            loadData(sock);
-        }
-        catch (Exception e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    private void loadData(Socket sock) throws IOException {
-        try (BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), "UTF-8"))) {
-            String val;
-
-            while (!isStopped() && (val = reader.readLine()) != null)
-                addData(val);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java b/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java
deleted file mode 100644
index 71a59bf..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streaming/Receiver.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * Base implementation of data receiver.
- *
- * @param <E> Type of stream element.
- * @param <K> Type of cache entry key.
- * @param <V> Type of cache entry value.
- */
-public abstract class Receiver<E, K, V> {
-    /** Object monitor. */
-    private final Object lock = new Object();
-
-    /** Stop latch. */
-    private final CountDownLatch stopLatch = new CountDownLatch(1);
-
-    /** State. */
-    private volatile State state = State.INITIALIZED;
-
-    /** Target streamer. */
-    private final IgniteDataStreamer<K, V> streamer;
-
-    /** Element to entries transformer. */
-    private final IgniteClosure<E, Map.Entry<K, V>> converter;
-
-    /** Restart interval in milliseconds. */
-    private volatile long restartInterval = 2000;
-
-    /**
-     * Constructs stream receiver.
-     *
-     * @param streamer Streamer.
-     * @param converter Element to entries transformer.
-     */
-    public Receiver(IgniteDataStreamer<K, V> streamer, IgniteClosure<E, Map.Entry<K, V>> converter) {
-        A.notNull(streamer, "streamer is null");
-        A.notNull(converter, "converter is null");
-
-        this.streamer = streamer;
-        this.converter = converter;
-    }
-
-    /**
-     * Sets restart interval in milliseconds.
-     *
-     * @param interval Interval in milliseconds.
-     */
-    public void restartInterval(long interval) {
-        A.ensure(interval > 0, "interval > 0");
-
-        this.restartInterval = interval;
-    }
-
-    /**
-     * Starts receiver.
-     */
-    public void start() {
-        synchronized (lock) {
-            if (state != State.INITIALIZED)
-                throw new IllegalStateException("Receiver in " + state + " state can't be started.");
-
-            new Thread(new ReceiverWorker()).start();
-
-            state = State.STARTED;
-        }
-    }
-
-    /**
-     * Stops receiver.
-     */
-    public void stop() {
-        synchronized (lock) {
-            if (state != State.STARTED)
-                throw new IllegalStateException("Receiver in " + state + " state can't be stopped.");
-
-            state = State.STOPPED;
-
-            try {
-                stopLatch.await();
-            }
-            catch (InterruptedException e) {
-                // No-op.
-            }
-        }
-    }
-
-    /**
-     * Checks whether receiver is started or not.
-     *
-     * @return {@code True} if receiver is started, {@code false} - otherwise.
-     */
-    public boolean isStarted() {
-        return state == State.STARTED;
-    }
-
-    /**
-     * Checks whether receiver is stopped or not.
-     *
-     * @return {@code True} if receiver is stopped, {@code false} - otherwise.
-     */
-    public boolean isStopped() {
-        return state == State.STOPPED;
-    }
-
-    /**
-     * Performs actual data receiving.
-     */
-    protected abstract void receive();
-
-    /**
-     * Convert stream data to cache entry and transfer it to the target streamer.
-     *
-     * @param element Element.
-     */
-    protected void addData(E element) {
-        streamer.addData(converter.apply(element));
-    }
-
-    /**
-     * Receiver state.
-     */
-    public enum State {
-        /** New. */
-        INITIALIZED,
-        /** Started. */
-        STARTED,
-        /** Stopped. */
-        STOPPED
-    }
-
-    /**
-     * Receiver worker that actually receives data from socket.
-     */
-    private class ReceiverWorker implements Runnable {
-        /** {@inheritDoc} */
-        @Override public void run() {
-            while (true) {
-                try {
-                    receive();
-                }
-                catch (Throwable e) {
-                    // No-op.
-                }
-
-                if (isStopped()) {
-                    stopLatch.countDown();
-
-                    break;
-                }
-
-                try {
-                    Thread.sleep(restartInterval);
-                }
-                catch (InterruptedException e) {
-                    // No-op.
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java b/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java
deleted file mode 100644
index 79bb27e..0000000
--- a/modules/core/src/main/java/org/apache/ignite/streaming/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains Ignite streaming classes.
- */
-package org.apache.ignite.streaming;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerTest.java
new file mode 100644
index 0000000..bff973a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.socket.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Test for data loading using {@link IgniteSocketStreamer}.
+ */
+public class IgniteSocketStreamerTest extends GridCommonAbstractTest {
+    /** Host. */
+    private static final String HOST = "localhost";
+
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Entry count. */
+    private static final int ENTRY_CNT = 5000;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * Tests data loading.
+     */
+    public void testStreamer() throws Exception {
+        try (Ignite g = startGrid()) {
+
+            IgniteCache<Integer, String> cache = g.cache(null);
+
+            cache.clear();
+
+            try (IgniteDataStreamer<Integer, String> stmr = g.dataStreamer(null)) {
+
+                startServer();
+
+                IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>> converter =
+                    new IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>>() {
+                        @Override public Map.Entry<Integer, String> apply(IgniteBiTuple<Integer, String> input) {
+                            return new IgniteBiTuple<>(input.getKey(), input.getValue());
+                        }
+                    };
+
+                final AtomicInteger cnt = new AtomicInteger();
+
+                IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr =
+                    new IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String>(
+                            HOST, PORT, stmr, converter
+                    ) {
+                        @Override protected void addData(IgniteBiTuple<Integer, String> element) {
+                            super.addData(element);
+
+                            cnt.incrementAndGet();
+                        }
+                    };
+
+                sockStmr.start();
+
+                // Wait for all data streamed.
+                while (cnt.get() < ENTRY_CNT)
+                    Thread.sleep(100);
+
+                sockStmr.stop();
+
+                assertFalse(sockStmr.isStarted());
+                assertTrue(sockStmr.isStopped());
+            }
+
+            assertEquals(ENTRY_CNT, cache.size());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Starts streaming server and writes data into socket.
+     */
+    private static void startServer() {
+        new Thread() {
+            @Override public void run() {
+                try (ServerSocket srvSock = new ServerSocket(PORT);
+                     Socket sock = srvSock.accept();
+                     ObjectOutputStream oos =
+                         new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) {
+
+                    for (int i = 0; i < ENTRY_CNT; i++)
+                        oos.writeObject(new IgniteBiTuple<>(i, Integer.toString(i)));
+                }
+                catch (IOException e) {
+                    // No-op.
+                }
+            }
+        }.start();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamerTest.java
new file mode 100644
index 0000000..9a87715
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteTextSocketStreamerTest.java
@@ -0,0 +1,123 @@
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.socket.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.cache.CacheMode.*;
+
+/**
+ * Test for data loading using {@link IgniteTextSocketStreamer}.
+ */
+public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest {
+    /** Host. */
+    private static final String HOST = "localhost";
+
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Entry count. */
+    private static final int ENTRY_CNT = 5000;
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setCacheMode(PARTITIONED);
+
+        ccfg.setBackups(1);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        return cfg;
+    }
+
+    /**
+     * Tests data loading.
+     */
+    public void testStream() throws Exception {
+        try (Ignite g = startGrid()) {
+
+            IgniteCache<Integer, String> cache = g.cache(null);
+
+            cache.clear();
+
+            try (IgniteDataStreamer<Integer, String> stmr = g.dataStreamer(null)) {
+
+                startServer();
+
+                IgniteClosure<String, Map.Entry<Integer, String>> converter =
+                    new IgniteClosure<String, Map.Entry<Integer, String>>() {
+                        @Override public Map.Entry<Integer, String> apply(String input) {
+                            String[] pair = input.split("=", 2);
+
+                            return new IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]);
+                        }
+                    };
+
+                final AtomicInteger cnt = new AtomicInteger();
+
+                IgniteTextSocketStreamer<Integer, String> sockStmr =
+                    new IgniteTextSocketStreamer<Integer, String>(HOST, PORT, stmr, converter) {
+                        @Override protected void addData(String element) {
+                            super.addData(element);
+
+                            cnt.incrementAndGet();
+                        }
+                    };
+
+                sockStmr.start();
+
+                // Wait for all data streamed.
+                while (cnt.get() < ENTRY_CNT)
+                    Thread.sleep(100);
+
+                sockStmr.stop();
+
+                assertFalse(sockStmr.isStarted());
+                assertTrue(sockStmr.isStopped());
+            }
+
+            assertEquals(ENTRY_CNT, cache.size());
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * Starts streaming server and writes data into socket.
+     */
+    private static void startServer() {
+        new Thread() {
+            @Override public void run() {
+                try (ServerSocket srvSock = new ServerSocket(PORT);
+                     Socket sock = srvSock.accept();
+                     BufferedWriter writer =
+                         new BufferedWriter(new OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) {
+
+                    for (int i = 0; i < ENTRY_CNT; i++) {
+                        String num = Integer.toString(i);
+
+                        writer.write(num + '=' + num);
+
+                        writer.newLine();
+                    }
+                }
+                catch (IOException e) {
+                    // No-op.
+                }
+            }
+        }.start();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/stream/socket/ReceiverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/ReceiverTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/ReceiverTest.java
new file mode 100644
index 0000000..a6754ba
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/ReceiverTest.java
@@ -0,0 +1,202 @@
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.lang.*;
+import junit.framework.*;
+import org.jetbrains.annotations.*;
+
+import java.util.*;
+
+/**
+ * Tests for {@link Receiver}.
+ */
+public class ReceiverTest extends TestCase {
+    /** Converter. */
+    private static final IgniteClosure<Integer, Map.Entry<Integer, String>> CONVERTER =
+        new IgniteClosure<Integer, Map.Entry<Integer, String>>() {
+            @Override public Map.Entry<Integer, String> apply(Integer input) {
+                return new IgniteBiTuple<>(input, input.toString());
+            }
+    };
+
+    /** Stmr. */
+    private static final IgniteDataStreamer<Integer, String> STMR = new DataStreamerStub<>();
+
+    /** Receiver. */
+    private final Receiver<Integer, Integer, String> receiver =
+        new Receiver<Integer, Integer, String>(STMR, CONVERTER) {
+            @Override protected void receive() {
+                while (!isStopped()) {
+                    try {
+                        Thread.sleep(50);
+                    }
+                    catch (InterruptedException e) {
+                        // No-op.
+                    }
+                }
+            }
+        };
+
+    /**
+     * Tests receiver behavior in case of forced termination.
+     *
+     * @throws Exception If error occurred.
+     */
+    public void testReceiver() throws Exception {
+        assertFalse(receiver.isStarted());
+        assertFalse(receiver.isStopped());
+
+        receiver.start();
+
+        assertTrue(receiver.isStarted());
+        assertFalse(receiver.isStopped());
+
+        // Wait for some period before stop.
+        Thread.sleep(500);
+
+        receiver.stop();
+
+        assertFalse(receiver.isStarted());
+        assertTrue(receiver.isStopped());
+
+        try {
+            receiver.start();
+            fail("IllegalStateException expected.");
+        }
+        catch (IllegalStateException e) {
+            // No-op
+        }
+
+        try {
+            receiver.stop();
+            fail("IllegalStateException expected.");
+        }
+        catch (IllegalStateException e) {
+            // No-op
+        }
+    }
+
+    /**
+     * Receiver stub.
+     *
+     * @param <K> Key type.
+     * @param <V> Value type.
+     */
+    private static class DataStreamerStub<K, V> implements IgniteDataStreamer<K, V> {
+        /** {@inheritDoc} */
+        @Override public String cacheName() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean allowOverwrite() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void allowOverwrite(boolean allowOverwrite) throws IgniteException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean skipStore() {
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void skipStore(boolean skipStore) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int perNodeBufferSize() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void perNodeBufferSize(int bufSize) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public int perNodeParallelOperations() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void perNodeParallelOperations(int parallelOps) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public long autoFlushFrequency() {
+            return 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void autoFlushFrequency(long autoFlushFreq) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> future() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void deployClass(Class<?> depCls) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void receiver(org.apache.ignite.stream.StreamReceiver<K, V> rcvr) {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> removeData(K key) throws IgniteException, IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries)
+                throws IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
+            return null;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void flush() throws IgniteException, IllegalStateException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void tryFlush() throws IgniteException, IllegalStateException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close(boolean cancel) throws IgniteException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IgniteException {
+            // No-op.
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
deleted file mode 100644
index d7357c7..0000000
--- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteSocketStreamerTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- * Test for data loading using {@link IgniteSocketStreamer}.
- */
-public class IgniteSocketStreamerTest extends GridCommonAbstractTest {
-    /** Host. */
-    private static final String HOST = "localhost";
-
-    /** Port. */
-    private static final int PORT = 5555;
-
-    /** Entry count. */
-    private static final int ENTRY_CNT = 5000;
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        CacheConfiguration ccfg = new CacheConfiguration();
-
-        ccfg.setCacheMode(PARTITIONED);
-
-        ccfg.setBackups(1);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        return cfg;
-    }
-
-    /**
-     * Tests data loading.
-     */
-    public void testStreamer() throws Exception {
-        try (Ignite g = startGrid()) {
-
-            IgniteCache<Integer, String> cache = g.cache(null);
-
-            cache.clear();
-
-            try (IgniteDataStreamer<Integer, String> stmr = g.dataStreamer(null)) {
-
-                startServer();
-
-                IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>> converter =
-                    new IgniteClosure<IgniteBiTuple<Integer, String>, Map.Entry<Integer, String>>() {
-                        @Override public Map.Entry<Integer, String> apply(IgniteBiTuple<Integer, String> input) {
-                            return new IgniteBiTuple<>(input.getKey(), input.getValue());
-                        }
-                    };
-
-                final AtomicInteger cnt = new AtomicInteger();
-
-                IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String> sockStmr =
-                    new IgniteSocketStreamer<IgniteBiTuple<Integer, String>, Integer, String>(
-                            HOST, PORT, stmr, converter
-                    ) {
-                        @Override protected void addData(IgniteBiTuple<Integer, String> element) {
-                            super.addData(element);
-
-                            cnt.incrementAndGet();
-                        }
-                    };
-
-                sockStmr.start();
-
-                // Wait for all data streamed.
-                while (cnt.get() < ENTRY_CNT)
-                    Thread.sleep(100);
-
-                sockStmr.stop();
-
-                assertFalse(sockStmr.isStarted());
-                assertTrue(sockStmr.isStopped());
-            }
-
-            assertEquals(ENTRY_CNT, cache.size());
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * Starts streaming server and writes data into socket.
-     */
-    private static void startServer() {
-        new Thread() {
-            @Override public void run() {
-                try (ServerSocket srvSock = new ServerSocket(PORT);
-                     Socket sock = srvSock.accept();
-                     ObjectOutputStream oos =
-                         new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()))) {
-
-                    for (int i = 0; i < ENTRY_CNT; i++)
-                        oos.writeObject(new IgniteBiTuple<>(i, Integer.toString(i)));
-                }
-                catch (IOException e) {
-                    // No-op.
-                }
-            }
-        }.start();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
deleted file mode 100644
index 436bc8f..0000000
--- a/modules/core/src/test/java/org/apache/ignite/streaming/IgniteTextSocketStreamerTest.java
+++ /dev/null
@@ -1,122 +0,0 @@
-package org.apache.ignite.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.cache.CacheMode.*;
-
-/**
- * Test for data loading using {@link IgniteTextSocketStreamer}.
- */
-public class IgniteTextSocketStreamerTest extends GridCommonAbstractTest {
-    /** Host. */
-    private static final String HOST = "localhost";
-
-    /** Port. */
-    private static final int PORT = 5555;
-
-    /** Entry count. */
-    private static final int ENTRY_CNT = 5000;
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        CacheConfiguration ccfg = new CacheConfiguration();
-
-        ccfg.setCacheMode(PARTITIONED);
-
-        ccfg.setBackups(1);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        return cfg;
-    }
-
-    /**
-     * Tests data loading.
-     */
-    public void testStream() throws Exception {
-        try (Ignite g = startGrid()) {
-
-            IgniteCache<Integer, String> cache = g.cache(null);
-
-            cache.clear();
-
-            try (IgniteDataStreamer<Integer, String> stmr = g.dataStreamer(null)) {
-
-                startServer();
-
-                IgniteClosure<String, Map.Entry<Integer, String>> converter =
-                    new IgniteClosure<String, Map.Entry<Integer, String>>() {
-                        @Override public Map.Entry<Integer, String> apply(String input) {
-                            String[] pair = input.split("=", 2);
-
-                            return new IgniteBiTuple<>(Integer.parseInt(pair[0]), pair[1]);
-                        }
-                    };
-
-                final AtomicInteger cnt = new AtomicInteger();
-
-                IgniteTextSocketStreamer<Integer, String> sockStmr =
-                    new IgniteTextSocketStreamer<Integer, String>(HOST, PORT, stmr, converter) {
-                        @Override protected void addData(String element) {
-                            super.addData(element);
-
-                            cnt.incrementAndGet();
-                        }
-                    };
-
-                sockStmr.start();
-
-                // Wait for all data streamed.
-                while (cnt.get() < ENTRY_CNT)
-                    Thread.sleep(100);
-
-                sockStmr.stop();
-
-                assertFalse(sockStmr.isStarted());
-                assertTrue(sockStmr.isStopped());
-            }
-
-            assertEquals(ENTRY_CNT, cache.size());
-        }
-        finally {
-            stopAllGrids();
-        }
-    }
-
-    /**
-     * Starts streaming server and writes data into socket.
-     */
-    private static void startServer() {
-        new Thread() {
-            @Override public void run() {
-                try (ServerSocket srvSock = new ServerSocket(PORT);
-                     Socket sock = srvSock.accept();
-                     BufferedWriter writer =
-                         new BufferedWriter(new OutputStreamWriter(sock.getOutputStream(), "UTF-8"))) {
-
-                    for (int i = 0; i < ENTRY_CNT; i++) {
-                        String num = Integer.toString(i);
-
-                        writer.write(num + '=' + num);
-
-                        writer.newLine();
-                    }
-                }
-                catch (IOException e) {
-                    // No-op.
-                }
-            }
-        }.start();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java b/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java
deleted file mode 100644
index 9f4e056..0000000
--- a/modules/core/src/test/java/org/apache/ignite/streaming/ReceiverTest.java
+++ /dev/null
@@ -1,203 +0,0 @@
-package org.apache.ignite.streaming;
-
-import org.apache.ignite.*;
-import org.apache.ignite.lang.*;
-
-import junit.framework.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-/**
- * Tests for {@link Receiver}.
- */
-public class ReceiverTest extends TestCase {
-    /** Converter. */
-    private static final IgniteClosure<Integer, Map.Entry<Integer, String>> CONVERTER =
-        new IgniteClosure<Integer, Map.Entry<Integer, String>>() {
-            @Override public Map.Entry<Integer, String> apply(Integer input) {
-                return new IgniteBiTuple<>(input, input.toString());
-            }
-    };
-
-    /** Stmr. */
-    private static final IgniteDataStreamer<Integer, String> STMR = new DataStreamerStub<>();
-
-    /** Receiver. */
-    private final Receiver<Integer, Integer, String> receiver =
-        new Receiver<Integer, Integer, String>(STMR, CONVERTER) {
-            @Override protected void receive() {
-                while (!isStopped()) {
-                    try {
-                        Thread.sleep(50);
-                    }
-                    catch (InterruptedException e) {
-                        // No-op.
-                    }
-                }
-            }
-        };
-
-    /**
-     * Tests receiver behavior in case of forced termination.
-     *
-     * @throws Exception If error occurred.
-     */
-    public void testReceiver() throws Exception {
-        assertFalse(receiver.isStarted());
-        assertFalse(receiver.isStopped());
-
-        receiver.start();
-
-        assertTrue(receiver.isStarted());
-        assertFalse(receiver.isStopped());
-
-        // Wait for some period before stop.
-        Thread.sleep(500);
-
-        receiver.stop();
-
-        assertFalse(receiver.isStarted());
-        assertTrue(receiver.isStopped());
-
-        try {
-            receiver.start();
-            fail("IllegalStateException expected.");
-        }
-        catch (IllegalStateException e) {
-            // No-op
-        }
-
-        try {
-            receiver.stop();
-            fail("IllegalStateException expected.");
-        }
-        catch (IllegalStateException e) {
-            // No-op
-        }
-    }
-
-    /**
-     * Receiver stub.
-     *
-     * @param <K> Key type.
-     * @param <V> Value type.
-     */
-    private static class DataStreamerStub<K, V> implements IgniteDataStreamer<K, V> {
-        /** {@inheritDoc} */
-        @Override public String cacheName() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean allowOverwrite() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void allowOverwrite(boolean allowOverwrite) throws IgniteException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean skipStore() {
-            return false;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void skipStore(boolean skipStore) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public int perNodeBufferSize() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void perNodeBufferSize(int bufSize) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public int perNodeParallelOperations() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void perNodeParallelOperations(int parallelOps) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public long autoFlushFrequency() {
-            return 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void autoFlushFrequency(long autoFlushFreq) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> future() {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void deployClass(Class<?> depCls) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void receiver(org.apache.ignite.stream.StreamReceiver<K, V> rcvr) {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> removeData(K key) throws IgniteException, IllegalStateException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> addData(K key, @Nullable V val) throws IgniteException, IllegalStateException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> addData(Map.Entry<K, V> entry) throws IgniteException, IllegalStateException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> addData(Collection<? extends Map.Entry<K, V>> entries)
-                throws IllegalStateException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public IgniteFuture<?> addData(Map<K, V> entries) throws IllegalStateException {
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void flush() throws IgniteException, IllegalStateException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void tryFlush() throws IgniteException, IllegalStateException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close(boolean cancel) throws IgniteException {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws IgniteException {
-            // No-op.
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/98d9511a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 7faf021..1a550fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -787,7 +787,7 @@
                                         </group>
                                         <group>
                                             <title>Streaming APIs</title>
-                                            <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.streaming</packages>
+                                            <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.stream.socket</packages>
                                         </group>
                                         <group>
                                             <title>Security APIs</title>
@@ -982,7 +982,7 @@
                                         </group>
                                         <group>
                                             <title>Streaming APIs</title>
-                                            <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.streaming</packages>
+                                            <packages>org.apache.ignite.streamer:org.apache.ignite.streamer.router:org.apache.ignite.streamer.window:org.apache.ignite.streamer.index:org.apache.ignite.streamer.index.hash:org.apache.ignite.streamer.index.tree:org.apache.ignite.stream:org.apache.ignite.stream.socket</packages>
                                         </group>
                                         <group>
                                             <title>Security APIs</title>


Mime
View raw message