flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [2/4] flink git commit: [FLINK-1688] [streaming] [api-extending] Socket Client Sink added to the DataStream API
Date Sun, 22 Mar 2015 17:35:26 GMT
[FLINK-1688] [streaming] [api-extending] Socket Client Sink added to the DataStream API

Moved Serialization Schemas from connectors to core
Minor cleanups for Socket Client Sink
This closes #484


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11796495
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11796495
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11796495

Branch: refs/heads/master
Commit: 11796495762893f2c8b1f744321d0c4b5db0b881
Parents: a3bc785
Author: mbalassi <mbalassi@apache.org>
Authored: Sun Mar 22 10:27:30 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Sun Mar 22 12:11:05 2015 +0100

----------------------------------------------------------------------
 .../flink-streaming-connectors/pom.xml          |   2 +-
 .../streaming/connectors/ConnectorSource.java   |   2 +-
 .../streaming/connectors/flume/FlumeSink.java   |   8 +-
 .../streaming/connectors/flume/FlumeSource.java |   2 +-
 .../connectors/flume/FlumeTopology.java         |   4 +-
 .../connectors/kafka/KafkaConsumerExample.java  |   2 +-
 .../connectors/kafka/KafkaProducerExample.java  |   2 +-
 .../kafka/KafkaSimpleConsumerExample.java       |   2 +-
 .../connectors/kafka/api/KafkaSink.java         |  10 +-
 .../connectors/kafka/api/KafkaSource.java       |   2 +-
 .../kafka/api/simple/KafkaTopicUtils.java       |   2 -
 .../kafka/api/simple/PersistentKafkaSource.java |   2 +-
 .../streaming/connectors/rabbitmq/RMQSink.java  |   8 +-
 .../connectors/rabbitmq/RMQSource.java          |   2 +-
 .../connectors/rabbitmq/RMQTopology.java        |   4 +-
 .../connectors/socket/SocketClientSink.java     | 141 -------------------
 .../connectors/util/DeserializationSchema.java  |  42 ------
 .../util/JavaDefaultStringSchema.java           |  41 ------
 .../streaming/connectors/util/RawSchema.java    |  39 -----
 .../connectors/util/SerializationSchema.java    |  33 -----
 .../connectors/util/SimpleStringSchema.java     |  40 ------
 .../streaming/connectors/kafka/KafkaITCase.java |   2 +-
 .../streaming/api/datastream/DataStream.java    |  16 +++
 .../api/function/sink/SocketClientSink.java     | 138 ++++++++++++++++++
 .../serialization/DeserializationSchema.java    |  42 ++++++
 .../serialization/JavaDefaultStringSchema.java  |  41 ++++++
 .../streaming/util/serialization/RawSchema.java |  39 +++++
 .../util/serialization/SerializationSchema.java |  33 +++++
 .../util/serialization/SimpleStringSchema.java  |  40 ++++++
 .../flink/streaming/api/scala/DataStream.scala  |   8 ++
 30 files changed, 384 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
index 4408ba0..8155100 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/pom.xml
@@ -35,7 +35,7 @@ under the License.
 
 	<packaging>jar</packaging>
 
-	<!-- Allow users to pass custom kafka versions -->
+	<!-- Allow users to pass custom connector versions -->
 	<properties>
 		<kafka.version>0.8.2.0</kafka.version>
 		<rabbitmq.version>3.3.1</rabbitmq.version>

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
index 1623943..a7b0b06 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/ConnectorSource.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
 import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 
 public abstract class ConnectorSource<OUT> extends RichParallelSourceFunction<OUT> implements
 		GenericSourceFunction<OUT> {

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
index 86fd1b1..27074ec 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSink.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.flume;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -39,12 +39,12 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	boolean initDone = false;
 	String host;
 	int port;
-	SerializationSchema<IN, byte[]> scheme;
+	SerializationSchema<IN, byte[]> schema;
 
 	public FlumeSink(String host, int port, SerializationSchema<IN, byte[]> schema) {
 		this.host = host;
 		this.port = port;
-		this.scheme = schema;
+		this.schema = schema;
 	}
 
 	/**
@@ -57,7 +57,7 @@ public class FlumeSink<IN> extends RichSinkFunction<IN> {
 	@Override
 	public void invoke(IN value) {
 
-		byte[] data = scheme.serialize(value);
+		byte[] data = schema.serialize(value);
 		client.sendDataToFlume(data);
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
index 2a321a2..00661ae 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeSource.java
@@ -21,7 +21,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.apache.flume.Context;
 import org.apache.flume.channel.ChannelProcessor;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
index 3cfd7d4..7c979d5 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/flume/FlumeTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.flume;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 public class FlumeTopology {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
index dd1221d..6a95b0c 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSource;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 
 public class KafkaConsumerExample {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
index 1fe759a..e7abf11 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerExample.java
@@ -21,7 +21,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.util.Collector;
 
 public class KafkaProducerExample {

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
index 47c5a33..2fd31ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSimpleConsumerExample.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 
 public class KafkaSimpleConsumerExample {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 1753561..be9eb57 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -25,7 +25,7 @@ import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
 import org.apache.flink.streaming.connectors.kafka.api.config.PartitionerWrapper;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.NetUtils;
 
 import com.google.common.base.Preconditions;
@@ -50,7 +50,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	private Properties props;
 	private String topicId;
 	private String zookeeperAddress;
-	private SerializationSchema<IN, byte[]> scheme;
+	private SerializationSchema<IN, byte[]> schema;
 	private SerializableKafkaPartitioner partitioner;
 	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
 
@@ -91,7 +91,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 
 		this.zookeeperAddress = zookeeperAddress;
 		this.topicId = topicId;
-		this.scheme = serializationSchema;
+		this.schema = serializationSchema;
 		this.partitioner = partitioner;
 	}
 
@@ -103,7 +103,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 
 		this.zookeeperAddress = zookeeperAddress;
 		this.topicId = topicId;
-		this.scheme = serializationSchema;
+		this.schema = serializationSchema;
 		this.partitionerClass = partitioner;
 	}
 
@@ -150,7 +150,7 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 */
 	@Override
 	public void invoke(IN next) {
-		byte[] serialized = scheme.serialize(next);
+		byte[] serialized = schema.serialize(next);
 		producer.send(new KeyedMessage<IN, byte[]>(topicId, next, serialized));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
index 4eff870..ae6c169 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSource.java
@@ -31,7 +31,7 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
index 7a10ed3..9e09ea8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
@@ -82,8 +82,6 @@ public class KafkaTopicUtils {
 
 		Broker leader = JavaConversions.asJavaCollection(partitionMetadata.isr()).iterator().next();
 
-		// TODO for Kafka version 8.2.0
-		//		return leader.connectionString();
 		return leader.connectionString();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 8ec298a..97225dc 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffs
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.GivenOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
index dae9c6d..48f5e60 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,12 +39,12 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	private transient ConnectionFactory factory;
 	private transient Connection connection;
 	private transient Channel channel;
-	private SerializationSchema<IN, byte[]> scheme;
+	private SerializationSchema<IN, byte[]> schema;
 
 	public RMQSink(String HOST_NAME, String QUEUE_NAME, SerializationSchema<IN, byte[]> schema) {
 		this.HOST_NAME = HOST_NAME;
 		this.QUEUE_NAME = QUEUE_NAME;
-		this.scheme = schema;
+		this.schema = schema;
 	}
 
 	/**
@@ -72,7 +72,7 @@ public class RMQSink<IN> extends RichSinkFunction<IN> {
 	@Override
 	public void invoke(IN value) {
 		try {
-			byte[] msg = scheme.serialize(value);
+			byte[] msg = schema.serialize(value);
 
 			channel.basicPublish("", QUEUE_NAME, null, msg);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
index 12ad3d6..03b6d10 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.connectors.ConnectorSource;
-import org.apache.flink.streaming.connectors.util.DeserializationSchema;
+import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
index a6ca9ae..0f06235 100755
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQTopology.java
@@ -19,8 +19,8 @@ package org.apache.flink.streaming.connectors.rabbitmq;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.apache.flink.streaming.connectors.util.SimpleStringSchema;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 
 public class RMQTopology {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
deleted file mode 100644
index 72ba4f2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/socket/SocketClientSink.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *	http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.socket;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.function.sink.RichSinkFunction;
-import org.apache.flink.streaming.connectors.util.SerializationSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Socket client that acts as a streaming sink. The data is sent to a Socket.
- *
- * @param <IN> data to be written into the Socket.
- */
-public class SocketClientSink<IN> extends RichSinkFunction<IN> {
-	private static final long serialVersionUID = 1L;
-
-    /**
-     * Class logger
-     */
-	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
-
-	private final String hostName;
-	private final int port;
-	private final SerializationSchema<IN, byte[]> scheme;
-	private transient Socket client;
-	private transient DataOutputStream dataOutputStream;
-
-    /**
-     * Default constructor.
-     *
-     * @param hostName Host of the Socket server.
-     * @param port Port of the Socket.
-     * @param schema Schema of the data.
-     */
-	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
-		this.hostName = hostName;
-		this.port = port;
-		this.scheme = schema;
-	}
-
-	/**
-	 * Initializes the connection to Socket.
-	 */
-	public void intializeConnection() {
-		OutputStream outputStream;
-		try {
-			client = new Socket(hostName, port);
-			outputStream = client.getOutputStream();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-		dataOutputStream = new DataOutputStream(outputStream);
-	}
-
-	/**
-	 * Called when new data arrives to the sink, and forwards it to Socket.
-	 *
-	 * @param value
-	 *			The incoming data
-	 */
-	@Override
-	public void invoke(IN value) {
-		byte[] msg = scheme.serialize(value);
-		try {
-			dataOutputStream.write(msg);
-		} catch (IOException e) {
-			if(LOG.isErrorEnabled()){
-				LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
-			}
-		}
-	}
-
-	/**
-	 * Closes the connection of the Socket client.
-	 */
-	private void closeConnection(){
-		try {
-			client.close();
-		} catch (IOException e) {
-			throw new RuntimeException("Error while closing connection with socket server at "
-					+ hostName + ":" + port, e);
-		} finally {
-            if (client != null) {
-                try {
-                    client.close();
-                } catch (IOException e) {
-                    LOG.error("Cannot close connection with socket server at "
-                            + hostName + ":" + port, e);
-                }
-            }
-        }
-    }
-
-    /**
-     * Initialize the connection with the Socket in the server.
-     * @param parameters Configuration.
-     */
-	@Override
-	public void open(Configuration parameters) {
-		intializeConnection();
-	}
-
-    /**
-     * Closes the connection with the Socket server.
-     */
-	@Override
-	public void close() {
-		closeConnection();
-	}
-
-    /**
-     * Closes the connection with the Socket server.
-     */
-	@Override
-	public void cancel() {
-		close();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
deleted file mode 100644
index 4507a1d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/DeserializationSchema.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface DeserializationSchema<T> extends Serializable {
-
-	/**
-	 * Deserializes the incoming data.
-	 * 
-	 * @param message
-	 *            The incoming message in a byte array
-	 * @return The deserialized message in the required format.
-	 */
-	public T deserialize(byte[] message);
-
-	/**
-	 * Method to decide whether the element signals the end of the stream. If
-	 * true is returned the element won't be emitted
-	 * 
-	 * @param nextElement
-	 *            The element to test for end signal
-	 * @return The end signal, if true the stream shuts down
-	 */
-	public boolean isEndOfStream(T nextElement);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
deleted file mode 100644
index 569d3e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/JavaDefaultStringSchema.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import org.apache.commons.lang3.SerializationUtils;
-
-public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public boolean isEndOfStream(String nextElement) {
-		return nextElement.equals("q");
-	}
-
-	@Override
-	public byte[] serialize(String element) {
-		return SerializationUtils.serialize(element);
-	}
-
-	@Override
-	public String deserialize(byte[] message) {
-		return SerializationUtils.deserialize(message);
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
deleted file mode 100644
index 29c749a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/RawSchema.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class RawSchema implements DeserializationSchema<byte[]>,
-		SerializationSchema<byte[], byte[]> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public byte[] deserialize(byte[] message) {
-		return message;
-	}
-
-	@Override
-	public boolean isEndOfStream(byte[] nextElement) {
-		return false;
-	}
-
-	@Override
-	public byte[] serialize(byte[] element) {
-		return element;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
deleted file mode 100644
index f8d2b2f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SerializationSchema.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-import java.io.Serializable;
-
-public interface SerializationSchema<T, R> extends Serializable {
-
-	/**
-	 * Serializes the incoming element to a specified type.
-	 * 
-	 * @param element
-	 *            The incoming element to be serialized
-	 * @return The serialized element.
-	 */
-	public R serialize(T element);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
deleted file mode 100644
index 4b21580..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/util/SimpleStringSchema.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.util;
-
-public class SimpleStringSchema implements DeserializationSchema<String>,
-		SerializationSchema<String, String> {
-
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public String deserialize(byte[] message) {
-		return new String(message);
-	}
-
-	@Override
-	public boolean isEndOfStream(String nextElement) {
-		return false;
-	}
-
-	@Override
-	public String serialize(String element) {
-		return element;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 134525d..ab786ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -36,7 +36,7 @@ import org.apache.flink.streaming.connectors.kafka.api.KafkaSink;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
-import org.apache.flink.streaming.connectors.util.JavaDefaultStringSchema;
+import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b62a6d8..5768101 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -59,6 +59,7 @@ import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
 import org.apache.flink.streaming.api.function.sink.FileSinkFunctionByMillis;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.function.sink.SocketClientSink;
 import org.apache.flink.streaming.api.invokable.SinkInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.operator.CounterInvokable;
@@ -80,6 +81,7 @@ import org.apache.flink.streaming.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.partitioner.ShufflePartitioner;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
 /**
  * A DataStream represents a stream of elements of the same type. A DataStream
@@ -1115,6 +1117,20 @@ public class DataStream<OUT> {
 		return writeToFile((OutputFormat<OUT>) of, millis);
 	}
 
+	/**
+	 * Writes the DataStream to a socket as a byte array. The format of the output is
+	 * specified by a {@link SerializationSchema}.
+	 *
+	 * @param hostName host of the socket
+	 * @param port port of the socket
+	 * @param schema schema for serialization
+	 * @return the closed DataStream
+	 */
+	public DataStreamSink<OUT> writeToSocket(String hostName, int port, SerializationSchema<OUT, byte[]> schema){
+		DataStreamSink<OUT> returnStream = addSink(new SocketClientSink<OUT>(hostName, port, schema));
+		return returnStream;
+	}
+
 	private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
 		DataStreamSink<OUT> returnStream = addSink(new FileSinkFunctionByMillis<OUT>(format, millis));
 		return returnStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
new file mode 100644
index 0000000..6ebcf46
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/sink/SocketClientSink.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.function.sink;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Socket client that acts as a streaming sink. The data is sent to a Socket as a byte array.
+ *
+ * @param <IN> data to be written into the Socket.
+ */
+public class SocketClientSink<IN> extends RichSinkFunction<IN> {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(SocketClientSink.class);
+
+	private final String hostName;
+	private final int port;
+	private final SerializationSchema<IN, byte[]> schema;
+	private transient Socket client;
+	private transient DataOutputStream dataOutputStream;
+
+	/**
+	 * Default constructor.
+	 *
+	 * @param hostName Host of the Socket server.
+	 * @param port Port of the Socket.
+	 * @param schema Schema of the data.
+	 */
+	public SocketClientSink(String hostName, int port, SerializationSchema<IN, byte[]> schema) {
+		this.hostName = hostName;
+		this.port = port;
+		this.schema = schema;
+	}
+
+	/**
+	 * Initializes the connection to Socket.
+	 */
+	public void intializeConnection() {
+		OutputStream outputStream;
+		try {
+			client = new Socket(hostName, port);
+			outputStream = client.getOutputStream();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+		dataOutputStream = new DataOutputStream(outputStream);
+	}
+
+	/**
+	 * Called when new data arrives to the sink, and forwards it to Socket.
+	 *
+	 * @param value
+	 *			The incoming data
+	 */
+	@Override
+	public void invoke(IN value) {
+		byte[] msg = schema.serialize(value);
+		try {
+			dataOutputStream.write(msg);
+		} catch (IOException e) {
+			if(LOG.isErrorEnabled()){
+				LOG.error("Cannot send message to socket server at " + hostName + ":" + port, e);
+			}
+		}
+	}
+
+	/**
+	 * Closes the connection of the Socket client.
+	 */
+	private void closeConnection(){
+		try {
+			dataOutputStream.flush();
+			client.close();
+		} catch (IOException e) {
+			throw new RuntimeException("Error while closing connection with socket server at "
+					+ hostName + ":" + port, e);
+		} finally {
+			if (client != null) {
+				try {
+					client.close();
+				} catch (IOException e) {
+					LOG.error("Cannot close connection with socket server at "
+							+ hostName + ":" + port, e);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Initialize the connection with the Socket in the server.
+	 * @param parameters Configuration.
+	 */
+	@Override
+	public void open(Configuration parameters) {
+		intializeConnection();
+	}
+
+	/**
+	 * Closes the connection with the Socket server.
+	 */
+	@Override
+	public void close() {
+		closeConnection();
+	}
+
+	/**
+	 * Closes the connection with the Socket server.
+	 */
+	@Override
+	public void cancel() {
+		close();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
new file mode 100644
index 0000000..08ef461
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+public interface DeserializationSchema<T> extends Serializable {
+
+	/**
+	 * Deserializes the incoming data.
+	 * 
+	 * @param message
+	 *            The incoming message in a byte array
+	 * @return The deserialized message in the required format.
+	 */
+	public T deserialize(byte[] message);
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted
+	 * 
+	 * @param nextElement
+	 *            The element to test for end signal
+	 * @return The end signal, if true the stream shuts down
+	 */
+	public boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
new file mode 100644
index 0000000..93d13ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/JavaDefaultStringSchema.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.commons.lang3.SerializationUtils;
+
+public class JavaDefaultStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public boolean isEndOfStream(String nextElement) {
+		return nextElement.equals("q");
+	}
+
+	@Override
+	public byte[] serialize(String element) {
+		return SerializationUtils.serialize(element);
+	}
+
+	@Override
+	public String deserialize(byte[] message) {
+		return SerializationUtils.deserialize(message);
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
new file mode 100644
index 0000000..e457bef
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/RawSchema.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+public class RawSchema implements DeserializationSchema<byte[]>,
+		SerializationSchema<byte[], byte[]> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public byte[] deserialize(byte[] message) {
+		return message;
+	}
+
+	@Override
+	public boolean isEndOfStream(byte[] nextElement) {
+		return false;
+	}
+
+	@Override
+	public byte[] serialize(byte[] element) {
+		return element;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
new file mode 100644
index 0000000..8124eb0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+public interface SerializationSchema<T, R> extends Serializable {
+
+	/**
+	 * Serializes the incoming element to a specified type.
+	 * 
+	 * @param element
+	 *            The incoming element to be serialized
+	 * @return The serialized element.
+	 */
+	public R serialize(T element);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
new file mode 100644
index 0000000..3d0a0d5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+public class SimpleStringSchema implements DeserializationSchema<String>,
+		SerializationSchema<String, String> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public String deserialize(byte[] message) {
+		return new String(message);
+	}
+
+	@Override
+	public boolean isEndOfStream(String nextElement) {
+		return false;
+	}
+
+	@Override
+	public String serialize(String element) {
+		return element;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/11796495/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 3dc54d6..7d77b5b 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream,
   SingleOutputStreamOperator, GroupedDataStream}
+import org.apache.flink.streaming.util.serialization.SerializationSchema
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -605,6 +606,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
     javaStream.writeAsCsv(path, millis)
 
   /**
+   * Writes the DataStream to a socket as a byte array. The format of the output is
+   * specified by a {@link SerializationSchema}.
+   */
+  def writeToSocket(hostname: String, port: Integer, schema: SerializationSchema[T, Array[Byte]]):
+    DataStream[T] = javaStream.writeToSocket(hostname, port, schema)
+
+  /**
    * Adds the given sink to this DataStream. Only streams with sinks added
    * will be executed once the StreamExecutionEnvironment.execute(...)
    * method is called.


Mime
View raw message