ignite-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sboi...@apache.org
Subject [01/17] ignite git commit: IGNITE-1370 Streamers: Implement multiple tuple extractor.
Date Tue, 22 Sep 2015 14:50:48 GMT
Repository: ignite
Updated Branches:
  refs/heads/ignite-1093-2 e5da2ca91 -> 94c9297ad


IGNITE-1370 Streamers: Implement multiple tuple extractor.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4d9734a0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4d9734a0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4d9734a0

Branch: refs/heads/ignite-1093-2
Commit: 4d9734a0842f0a46310c1f6261fdf42371db8705
Parents: b736c46
Author: Raul Kripalani <raulk@apache.org>
Authored: Thu Sep 3 23:31:08 2015 +0100
Committer: Raul Kripalani <raulk@apache.org>
Committed: Sun Sep 13 01:20:24 2015 +0100

----------------------------------------------------------------------
 .../org/apache/ignite/stream/StreamAdapter.java |  48 ++++++++-
 .../stream/StreamMultipleTupleExtractor.java    |  38 +++++++
 .../ignite/stream/StreamTupleExtractor.java     |   5 +
 .../ignite/stream/socket/SocketStreamer.java    |   3 +-
 .../stream/socket/SocketStreamerSelfTest.java   | 104 +++++++++++++++----
 5 files changed, 171 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
index 97edcbb..ffa0821 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.stream;
 
 import java.util.Map;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteDataStreamer;
 
@@ -26,11 +27,22 @@ import org.apache.ignite.IgniteDataStreamer;
  * streaming from different data sources. The purpose of adapters is to
  * convert different message formats into Ignite stream key-value tuples
  * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ * <p>
+ * Two types of tuple extractors are supported:
+ * <ol>
+ *     <li>A single tuple extractor, which extracts either no or 1 tuple out of a message.
See
+ *     see {@link #setTupleExtractor(StreamTupleExtractor)}.</li>
+ *     <li>A multiple tuple extractor, which is capable of extracting multiple tuples
out of a single message, in the
+ *     form of a {@link Map<K, V>}. See {@link #setMultipleTupleExtractor(StreamMultipleTupleExtractor)}.</li>
+ * </ol>
  */
 public abstract class StreamAdapter<T, K, V> {
     /** Tuple extractor. */
     private StreamTupleExtractor<T, K, V> extractor;
 
+    /** Tuple extractor that supports extracting N tuples from a single event (1:n cardinality).
*/
+    private StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor;
+
     /** Streamer. */
     private IgniteDataStreamer<K, V> stmr;
 
@@ -84,6 +96,20 @@ public abstract class StreamAdapter<T, K, V> {
     }
 
     /**
+     * @return Provided tuple extractor (for 1:n cardinality).
+     */
+    public StreamMultipleTupleExtractor<T, K, V> getMultipleTupleExtractor() {
+        return multipleTupleExtractor;
+    }
+
+    /**
+     * @param multipleTupleExtractor Extractor for 1:n tuple extraction.
+     */
+    public void setMultipleTupleExtractor(StreamMultipleTupleExtractor<T, K, V> multipleTupleExtractor)
{
+        this.multipleTupleExtractor = multipleTupleExtractor;
+    }
+
+    /**
      * @return Provided {@link Ignite} instance.
      */
     public Ignite getIgnite() {
@@ -98,14 +124,28 @@ public abstract class StreamAdapter<T, K, V> {
     }
 
     /**
-     * Converts given message to a tuple and adds it to the underlying streamer.
+     * Converts given message to 1 or many tuples (depending on the type of extractor) and
adds it/them to the
+     * underlying streamer.
+     * <p>
+     * If both a {@link #multipleTupleExtractor} and a {@link #extractor} have been set,
the former will take precedence
+     * and the latter will be ignored.
      *
      * @param msg Message to convert.
      */
     protected void addMessage(T msg) {
-        Map.Entry<K, V> e = extractor.extract(msg);
+        if (multipleTupleExtractor == null) {
+            Map.Entry<K, V> e = extractor.extract(msg);
+
+            if (e != null)
+                stmr.addData(e);
 
-        if (e != null)
-            stmr.addData(e);
+        } else {
+            Map<K, V> m = multipleTupleExtractor.extract(msg);
+
+            if (m != null)
+                stmr.addData(m);
+
+        }
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
new file mode 100644
index 0000000..71ad45a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamMultipleTupleExtractor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.Map;
+
+/**
+ * Stream tuple extractor to convert a single message to zero, one or many tuples.
+ * <p>
+ * For cases where cardinality will always be 1:1 (or 0:1), you may consider {@link StreamTupleExtractor}.
+ *
+ * @see StreamTupleExtractor
+ */
+public interface StreamMultipleTupleExtractor<T, K, V> {
+
+    /**
+     * Extracts a set of key-values from a message.
+     *
+     * @param msg Message.
+     * @return Map containing resulting tuples.
+     */
+    public Map<K, V> extract(T msg);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
index b6150ab..aed7d8a 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -21,6 +21,11 @@ import java.util.Map;
 
 /**
  * Stream tuple extractor to convert messages to Ignite key-value tuples.
+ * <p>
+ * Alternatively, {@link StreamMultipleTupleExtractor} can be employed in cases where a single
message/event may
+ * produce more than one tuple.
+ *
+ * @see StreamMultipleTupleExtractor
  */
 public interface StreamTupleExtractor<T, K, V> {
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 0d27af9..c89952d 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -141,7 +141,8 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T,
K, V> {
      * @throws IgniteException If failed.
      */
     public void start() {
-        A.notNull(getTupleExtractor(), "tupleExtractor");
+        A.ensure(getTupleExtractor() != null || getMultipleTupleExtractor() != null,
+            "tupleExtractor (single or multiple)");
         A.notNull(getStreamer(), "streamer");
         A.notNull(getIgnite(), "ignite");
         A.ensure(threads > 0, "threads > 0");

http://git-wip-us.apache.org/repos/asf/ignite/blob/4d9734a0/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
index 185599d..8b05754 100644
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -24,6 +24,7 @@ import java.io.Serializable;
 import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
@@ -43,6 +44,7 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.stream.StreamMultipleTupleExtractor;
 import org.apache.ignite.stream.StreamTupleExtractor;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.Nullable;
@@ -111,7 +113,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     Marshaller marsh = new JdkMarshaller();
 
                     for (int i = 0; i < CNT; i++) {
-                        byte[] msg = marsh.marshal(new Tuple(i));
+                        byte[] msg = marsh.marshal(new Message(i));
 
                         os.write(msg.length >>> 24);
                         os.write(msg.length >>> 16);
@@ -125,21 +127,52 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testMultipleEntriesFromOneMessage() throws Exception {
+        test(null, null, new Runnable() {
+            @Override public void run() {
+                try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                     OutputStream os = new BufferedOutputStream(sock.getOutputStream()))
{
+                    Marshaller marsh = new JdkMarshaller();
+
+                    int[] values = new int[CNT];
+                    for (int i = 0; i < CNT; i++) {
+                        values[i] = i;
+                    }
+
+                    byte[] msg = marsh.marshal(new Message(values));
+
+                    os.write(msg.length >>> 24);
+                    os.write(msg.length >>> 16);
+                    os.write(msg.length >>> 8);
+                    os.write(msg.length);
+
+                    os.write(msg);
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        }, false);
     }
 
     /**
      * @throws Exception If failed.
      */
     public void testSizeBasedCustomConverter() throws Exception {
-        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>()
{
-            @Override public Tuple convert(byte[] msg) {
+        SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>()
{
+            @Override public Message convert(byte[] msg) {
                 int i = (msg[0] & 0xFF) << 24;
                 i |= (msg[1] & 0xFF) << 16;
                 i |= (msg[2] & 0xFF) << 8;
                 i |= msg[3] & 0xFF;
 
-                return new Tuple(i);
+                return new Message(i);
             }
         };
 
@@ -164,7 +197,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
     }
 
     /**
@@ -178,7 +211,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     Marshaller marsh = new JdkMarshaller();
 
                     for (int i = 0; i < CNT; i++) {
-                        byte[] msg = marsh.marshal(new Tuple(i));
+                        byte[] msg = marsh.marshal(new Message(i));
 
                         os.write(msg);
                         os.write(DELIM);
@@ -188,7 +221,7 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
 
     }
 
@@ -196,14 +229,14 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     public void testDelimiterBasedCustomConverter() throws Exception {
-        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>()
{
-            @Override public Tuple convert(byte[] msg) {
+        SocketMessageConverter<Message> converter = new SocketMessageConverter<Message>()
{
+            @Override public Message convert(byte[] msg) {
                 int i = (msg[0] & 0xFF) << 24;
                 i |= (msg[1] & 0xFF) << 16;
                 i |= (msg[2] & 0xFF) << 8;
                 i |= msg[3] & 0xFF;
 
-                return new Tuple(i);
+                return new Message(i);
             }
         };
 
@@ -225,16 +258,17 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
                     throw new IgniteException(e);
                 }
             }
-        });
+        }, true);
     }
 
     /**
      * @param converter Converter.
      * @param r Runnable..
      */
-    private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable
byte[] delim, Runnable r) throws Exception
+    private void test(@Nullable SocketMessageConverter<Message> converter, @Nullable
byte[] delim, Runnable r,
+        boolean oneMessagePerTuple) throws Exception
     {
-        SocketStreamer<Tuple, Integer, String> sockStmr = null;
+        SocketStreamer<Message, Integer, String> sockStmr = null;
 
         Ignite ignite = grid(0);
 
@@ -257,11 +291,24 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
 
             sockStmr.setDelimiter(delim);
 
-            sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>()
{
-                @Override public Map.Entry<Integer, String> extract(Tuple msg) {
-                    return new IgniteBiTuple<>(msg.key, msg.val);
-                }
-            });
+            if (oneMessagePerTuple) {
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<Message, Integer,
String>() {
+                    @Override public Map.Entry<Integer, String> extract(Message msg)
{
+                        return new IgniteBiTuple<>(msg.key, msg.val);
+                    }
+                });
+            }
+            else {
+                sockStmr.setMultipleTupleExtractor(new StreamMultipleTupleExtractor<Message,
Integer, String>() {
+                    @Override public Map<Integer, String> extract(Message msg) {
+                        Map<Integer, String> answer = new HashMap<>();
+                        for (int value : msg.values) {
+                            answer.put(value, Integer.toString(value));
+                        }
+                        return answer;
+                    }
+                });
+            }
 
             if (converter != null)
                 sockStmr.setConverter(converter);
@@ -297,9 +344,9 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Tuple.
+     * Message.
      */
-    private static class Tuple implements Serializable {
+    private static class Message implements Serializable {
         /** Serial version uid. */
         private static final long serialVersionUID = 0L;
 
@@ -309,12 +356,25 @@ public class SocketStreamerSelfTest extends GridCommonAbstractTest {
         /** Value. */
         private final String val;
 
+        /** Multiple values. */
+        private final int[] values;
+
         /**
          * @param key Key.
          */
-        Tuple(int key) {
+        Message(int key) {
             this.key = key;
             this.val = Integer.toString(key);
+            this.values = new int[0];
+        }
+
+        /**
+         * @param values Multiple values.
+         */
+        Message(int[] values) {
+            this.key = -1;
+            this.val = null;
+            this.values = values;
         }
     }
 }
\ No newline at end of file


Mime
View raw message