flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/4] flink git commit: [FLINK-1688] [streaming] Socket client sink added
Date Sun, 22 Mar 2015 17:35:25 GMT
Repository: flink
Updated Branches:
  refs/heads/master 244e5d5f8 -> 35f34162a


[FLINK-1688] [streaming] Socket client sink added


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3bc7852
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3bc7852
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3bc7852

Branch: refs/heads/master
Commit: a3bc785232d3071ac5cb76ab20db9c30a3d2a22a
Parents: 244e5d5
Author: miguel0afd <mafernandez@stratio.com>
Authored: Sat Mar 14 23:54:38 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Mar 22 08:46:23 2015 +0100

----------------------------------------------------------------------
 .../connectors/socket/SocketClientSink.java     | 141 +++++++++++++++++++
 1 file changed, 141 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a3bc7852/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
new file mode 100644
index 0000000..72ba4f2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.socket;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
+import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Socket client that acts as a streaming sink. The data is sent to a Socket.
+ *
+ * @param <IN> data to be written into the Socket.
+ */
+public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+    /**
+     * Class logger
+     */
+	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+	private final String hostName;
+	private final int port;
+	private final SerializationSchema<IN, byte[]> scheme;
+	private transient Socket client;
+	private transient DataOutputStream dataOutputStream;
+
+    /**
+     * Default constructor.
+     *
+     * @param hostName Host of the Socket server.
+     * @param port Port of the Socket.
+     * @param schema Schema of the data.
+     */
+	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]>
schema) {
+		this.hostName = hostName;
+		this.port = port;
+		this.scheme = schema;
+	}
+
+	/**
+	 * Initializes the connection to Socket.
+	 */
+	public void intializeConnection() {
+		OutputStream outputStream;
+		try {
+			client = new Socket(hostName, port);
+			outputStream = client.getOutputStream();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		dataOutputStream = new DataOutputStream(outputStream);
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Socket.
+	 *
+	 * @param value
+	 *			The incoming data
+	 */
+	@Override
+	public void invoke(IN value) {
+		byte[] msg = scheme.serialize(value);
+		try {
+			dataOutputStream.write(msg);
+		} catch (IOException e) {
+			if(LOG.isErrorEnabled()){
+				LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
+			}
+		}
+	}
+
+	/**
+	 * Closes the connection of the Socket client.
+	 */
+	private void closeConnection(){
+		try {
+			client.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing connection with socket server at "
+					+ hostName + ":" + port, e);
+		} finally {
+            if (client != null) {
+                try {
+                    client.close();
+                } catch (IOException e) {
+                    LOG.error("Cannot close connection with socket server at "
+                            + hostName + ":" + port, e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Initialize the connection with the Socket in the server.
+     * @param parameters Configuration.
+     */
+	@Override
+	public void open(Configuration parameters) {
+		intializeConnection();
+	}
+
+    /**
+     * Closes the connection with the Socket server.
+     */
+	@Override
+	public void close() {
+		closeConnection();
+	}
+
+    /**
+     * Closes the connection with the Socket server.
+     */
+	@Override
+	public void cancel() {
+		close();
+	}
+
+}


Mime
View raw message