flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [2/2] flink git commit: [hotfix] Move DataStreamUtils to the datastream API package so that we can actually use it to expose package-private constructors or methods for experimental features.
Date Fri, 09 Feb 2018 17:23:52 GMT
[hotfix] Move DataStreamUtils to the datastream API package so that we can actually use it
to expose package-private constructors or methods for experimental features.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bfe6f84c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bfe6f84c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bfe6f84c

Branch: refs/heads/master
Commit: bfe6f84cf1b99f1dfa801a1818fa51ccc7817c9b
Parents: dea4172
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Wed Feb 7 11:04:14 2018 +0100
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Fri Feb 9 18:23:37 2018 +0100

----------------------------------------------------------------------
 .../api/datastream/DataStreamUtils.java         | 115 +++++++++++++++++++
 .../streaming/experimental/CollectSink.java     |   2 +-
 .../streaming/experimental/DataStreamUtils.java | 115 -------------------
 .../experimental/SocketStreamIterator.java      |   6 +-
 .../streaming/api/scala/DataStreamUtils.scala   |  47 ++++++++
 .../experimental/scala/DataStreamUtils.scala    |  48 --------
 .../streaming/experimental/CollectITCase.java   |   2 +-
 7 files changed, 168 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
new file mode 100644
index 0000000..d145d6f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.net.ConnectionUtils;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.experimental.CollectSink;
+import org.apache.flink.streaming.experimental.SocketStreamIterator;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.Iterator;
+
+/**
+ * A collection of experimental utilities for {@link DataStream DataStreams}.
+ *
+ * <p>This experimental class is relocated from flink-streaming-contrib. Please see
package-info.java
+ * for more information.
+ */
+@PublicEvolving
+public final class DataStreamUtils {
+
+	/**
+	 * Returns an iterator to iterate over the elements of the DataStream.
+	 * @return The iterator
+	 */
+	public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) throws
IOException {
+
+		TypeSerializer<OUT> serializer = stream.getType().createSerializer(
+				stream.getExecutionEnvironment().getConfig());
+
+		SocketStreamIterator<OUT> iter = new SocketStreamIterator<OUT>(serializer);
+
+		//Find out what IP of us should be given to CollectSink, that it will be able to connect
to
+		StreamExecutionEnvironment env = stream.getExecutionEnvironment();
+		InetAddress clientAddress;
+
+		if (env instanceof RemoteStreamEnvironment) {
+			String host = ((RemoteStreamEnvironment) env).getHost();
+			int port = ((RemoteStreamEnvironment) env).getPort();
+			try {
+				clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port),
2000, 400);
+			}
+			catch (Exception e) {
+				throw new IOException("Could not determine an suitable network address to " +
+						"receive back data from the streaming program.", e);
+			}
+		} else if (env instanceof LocalStreamEnvironment) {
+			clientAddress = InetAddress.getLoopbackAddress();
+		} else {
+			try {
+				clientAddress = InetAddress.getLocalHost();
+			} catch (UnknownHostException e) {
+				throw new IOException("Could not determine this machines own local address to " +
+						"receive back data from the streaming program.", e);
+			}
+		}
+
+		DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress,
iter.getPort(), serializer));
+		sink.setParallelism(1); // It would not work if multiple instances would connect to the
same port
+
+		(new CallExecute(env, iter)).start();
+
+		return iter;
+	}
+
+	private static class CallExecute extends Thread {
+
+		private final StreamExecutionEnvironment toTrigger;
+		private final SocketStreamIterator<?> toNotify;
+
+		private CallExecute(StreamExecutionEnvironment toTrigger, SocketStreamIterator<?>
toNotify) {
+			this.toTrigger = toTrigger;
+			this.toNotify = toNotify;
+		}
+
+		@Override
+		public void run(){
+			try {
+				toTrigger.execute();
+			}
+			catch (Throwable t) {
+				toNotify.notifyOfError(t);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Private constructor to prevent instantiation.
+	 */
+	private DataStreamUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
index 23b5280..aa0b53b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/CollectSink.java
@@ -35,7 +35,7 @@ import java.net.Socket;
  * for more information.
  */
 @Internal
-class CollectSink<IN> extends RichSinkFunction<IN> {
+public class CollectSink<IN> extends RichSinkFunction<IN> {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
deleted file mode 100644
index 59ad6a8..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/DataStreamUtils.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.experimental;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.net.ConnectionUtils;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSink;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.RemoteStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.util.Iterator;
-
-/**
- * A collection of utilities for {@link DataStream DataStreams}.
- *
- * <p>This experimental class is relocated from flink-streaming-contrib. Please see
package-info.java
- * for more information.
- */
-@PublicEvolving
-public final class DataStreamUtils {
-
-	/**
-	 * Returns an iterator to iterate over the elements of the DataStream.
-	 * @return The iterator
-	 */
-	public static <OUT> Iterator<OUT> collect(DataStream<OUT> stream) throws
IOException {
-
-		TypeSerializer<OUT> serializer = stream.getType().createSerializer(
-				stream.getExecutionEnvironment().getConfig());
-
-		SocketStreamIterator<OUT> iter = new SocketStreamIterator<OUT>(serializer);
-
-		//Find out what IP of us should be given to CollectSink, that it will be able to connect
to
-		StreamExecutionEnvironment env = stream.getExecutionEnvironment();
-		InetAddress clientAddress;
-
-		if (env instanceof RemoteStreamEnvironment) {
-			String host = ((RemoteStreamEnvironment) env).getHost();
-			int port = ((RemoteStreamEnvironment) env).getPort();
-			try {
-				clientAddress = ConnectionUtils.findConnectingAddress(new InetSocketAddress(host, port),
2000, 400);
-			}
-			catch (Exception e) {
-				throw new IOException("Could not determine an suitable network address to " +
-						"receive back data from the streaming program.", e);
-			}
-		} else if (env instanceof LocalStreamEnvironment) {
-			clientAddress = InetAddress.getLoopbackAddress();
-		} else {
-			try {
-				clientAddress = InetAddress.getLocalHost();
-			} catch (UnknownHostException e) {
-				throw new IOException("Could not determine this machines own local address to " +
-						"receive back data from the streaming program.", e);
-			}
-		}
-
-		DataStreamSink<OUT> sink = stream.addSink(new CollectSink<OUT>(clientAddress,
iter.getPort(), serializer));
-		sink.setParallelism(1); // It would not work if multiple instances would connect to the
same port
-
-		(new CallExecute(env, iter)).start();
-
-		return iter;
-	}
-
-	private static class CallExecute extends Thread {
-
-		private final StreamExecutionEnvironment toTrigger;
-		private final SocketStreamIterator<?> toNotify;
-
-		private CallExecute(StreamExecutionEnvironment toTrigger, SocketStreamIterator<?>
toNotify) {
-			this.toTrigger = toTrigger;
-			this.toNotify = toNotify;
-		}
-
-		@Override
-		public void run(){
-			try {
-				toTrigger.execute();
-			}
-			catch (Throwable t) {
-				toNotify.notifyOfError(t);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private DataStreamUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
index 871c0f7..ccb54ed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/experimental/SocketStreamIterator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.experimental;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -42,7 +43,7 @@ import java.util.NoSuchElementException;
  * @param <T> The type of elements returned from the iterator.
  */
 @PublicEvolving
-class SocketStreamIterator<T> implements Iterator<T> {
+public class SocketStreamIterator<T> implements Iterator<T> {
 
 	/** Server socket to listen at. */
 	private final ServerSocket socket;
@@ -62,7 +63,8 @@ class SocketStreamIterator<T> implements Iterator<T> {
 	/** Async error, for example by the executor of the program that produces the stream. */
 	private volatile Throwable error;
 
-	SocketStreamIterator(TypeSerializer<T> serializer) throws IOException {
+	@Internal
+	public SocketStreamIterator(TypeSerializer<T> serializer) throws IOException {
 		this.serializer = serializer;
 		try {
 			socket = new ServerSocket(0, 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
new file mode 100644
index 0000000..74dd66a
--- /dev/null
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStreamUtils.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.datastream.{DataStreamUtils => JavaStreamUtils}
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+/**
+  * This class provides simple utility methods for collecting a [[DataStream]],
+  * effectively enriching it with the functionality encapsulated by [[DataStreamUtils]].
+  *
+  * This experimental class is relocated from flink-streaming-contrib.
+  *
+  * @param self DataStream
+  */
+@PublicEvolving
+class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {
+
+  /**
+    * Returns a scala iterator to iterate over the elements of the DataStream.
+    * @return The iterator
+    */
+  def collect() : Iterator[T] = {
+    JavaStreamUtils.collect(self.javaStream).asScala
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
deleted file mode 100644
index 8c4beff..0000000
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/experimental/scala/DataStreamUtils.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.experimental.scala
-
-import org.apache.flink.annotation.PublicEvolving
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.scala.DataStream
-import org.apache.flink.streaming.experimental.{DataStreamUtils => JavaStreamUtils}
-
-import scala.collection.JavaConverters._
-import scala.reflect.ClassTag
-
-/**
-  * This class provides simple utility methods for collecting a [[DataStream]],
-  * effectively enriching it with the functionality encapsulated by [[DataStreamUtils]].
-  *
-  * This experimental class is relocated from flink-streaming-contrib.
-  *
-  * @param self DataStream
-  */
-@PublicEvolving
-class DataStreamUtils[T: TypeInformation : ClassTag](val self: DataStream[T]) {
-
-  /**
-    * Returns a scala iterator to iterate over the elements of the DataStream.
-    * @return The iterator
-    */
-  def collect() : Iterator[T] = {
-    JavaStreamUtils.collect(self.javaStream).asScala
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/bfe6f84c/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
index ad07390..0535bf7 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/experimental/CollectITCase.java
@@ -21,8 +21,8 @@ package org.apache.flink.test.streaming.experimental;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.experimental.DataStreamUtils;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.TestLogger;
 


Mime
View raw message