flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [28/37] flink git commit: [FLINK-6086] [py] Clean up PythonSender/-Streamer generics
Date Thu, 06 Apr 2017 07:28:41 GMT
[FLINK-6086] [py] Clean up PythonSender/-Streamer generics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7251c56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7251c56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7251c56

Branch: refs/heads/table-retraction
Commit: a7251c56d0d03293f95f0e3b084326a96fe50ce3
Parents: 9bdbe60
Author: zentol <chesnay@apache.org>
Authored: Thu Mar 16 23:26:09 2017 +0100
Committer: zentol <chesnay@apache.org>
Committed: Wed Apr 5 20:43:30 2017 +0200

----------------------------------------------------------------------
 .../python/api/functions/PythonCoGroup.java     |   6 +-
 .../api/functions/PythonMapPartition.java       |   6 +-
 .../streaming/data/PythonDualInputSender.java   |  70 ++++++++++
 .../streaming/data/PythonDualInputStreamer.java |  97 ++++++++++++++
 .../api/streaming/data/PythonReceiver.java      |   2 +
 .../python/api/streaming/data/PythonSender.java | 100 +++------------
 .../streaming/data/PythonSingleInputSender.java |  50 ++++++++
 .../data/PythonSingleInputStreamer.java         |  89 +++++++++++++
 .../api/streaming/data/PythonStreamer.java      | 128 ++-----------------
 .../data/SingleElementPushBackIterator.java     |  57 +++++++++
 .../data/SingleElementPushBackIteratorTest.java |  76 +++++++++++
 11 files changed, 479 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
index 2065b98..72d3361 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java
@@ -14,7 +14,7 @@ package org.apache.flink.python.api.functions;
 
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.streaming.data.PythonStreamer;
+import org.apache.flink.python.api.streaming.data.PythonDualInputStreamer;
 import org.apache.flink.util.Collector;
 import java.io.IOException;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
@@ -31,12 +31,12 @@ public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1,
IN2,
 
 	private static final long serialVersionUID = -3997396583317513873L;
 
-	private final PythonStreamer<IN1, IN2, OUT> streamer;
+	private final PythonDualInputStreamer<IN1, IN2, OUT> streamer;
 	private final transient TypeInformation<OUT> typeInformation;
 
 	public PythonCoGroup(int envID, int setID, TypeInformation<OUT> typeInformation) {
 		this.typeInformation = typeInformation;
-		streamer = new PythonStreamer<>(this, envID, setID, true);
+		streamer = new PythonDualInputStreamer<>(this, envID, setID, true);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
index dc21c7c..9142581 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java
@@ -18,7 +18,7 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.python.api.streaming.data.PythonStreamer;
+import org.apache.flink.python.api.streaming.data.PythonSingleInputStreamer;
 import org.apache.flink.util.Collector;
 
 /**
@@ -32,12 +32,12 @@ public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN,
OU
 
 	private static final long serialVersionUID = 3866306483023916413L;
 
-	private final PythonStreamer<IN, IN, OUT> streamer;
+	private final PythonSingleInputStreamer<IN, OUT> streamer;
 	private final transient TypeInformation<OUT> typeInformation;
 
 	public PythonMapPartition(int envId, int setId, TypeInformation<OUT> typeInformation)
{
 		this.typeInformation = typeInformation;
-		streamer = new PythonStreamer(this, envId, setId, typeInformation instanceof PrimitiveArrayTypeInfo);
+		streamer = new PythonSingleInputStreamer<>(this, envId, setId, typeInformation instanceof
PrimitiveArrayTypeInfo);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
new file mode 100644
index 0000000..3b8e423
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputSender.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link PythonSender} for operations with two input streams.
+ *
+ * @param <IN1> first input type
+ * @param <IN2> second input type
+ */
+public class PythonDualInputSender<IN1, IN2> extends PythonSender {
+
+	private static final long serialVersionUID = 614115041181108878L;
+
+	private transient Serializer<IN1> serializer1;
+	private transient Serializer<IN2> serializer2;
+
+	/**
+	 * Extracts records from an iterator and writes them to the memory-mapped file. This method
assumes that all values
+	 * in the iterator are of the same type. This method does NOT take care of synchronization.
The caller must
+	 * guarantee that the file may be written to before calling this method.
+	 *
+	 * @param input iterator containing records
+	 * @return size of the written buffer
+	 * @throws IOException
+	 */
+	public int sendBuffer1(SingleElementPushBackIterator<IN1> input) throws IOException
{
+		if (serializer1 == null) {
+			IN1 value = input.next();
+			serializer1 = getSerializer(value);
+			input.pushBack(value);
+		}
+		return sendBuffer(input, serializer1);
+	}
+
+	/**
+	 * Extracts records from an iterator and writes them to the memory-mapped file. This method
assumes that all values
+	 * in the iterator are of the same type. This method does NOT take care of synchronization.
The caller must
+	 * guarantee that the file may be written to before calling this method.
+	 *
+	 * @param input iterator containing records
+	 * @return size of the written buffer
+	 * @throws IOException
+	 */
+	public int sendBuffer2(SingleElementPushBackIterator<IN2> input) throws IOException
{
+		if (serializer2 == null) {
+			IN2 value = input.next();
+			serializer2 = getSerializer(value);
+			input.pushBack(value);
+		}
+		return sendBuffer(input, serializer2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
new file mode 100644
index 0000000..2e9ba2c
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonDualInputStreamer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.Iterator;
+
+/**
+ * This class is a {@link PythonStreamer} for operations with two input stream.
+ *
+ * @param <IN1> first input type
+ * @param <IN2> second input type
+ * @param <OUT> output type
+ */
+public class PythonDualInputStreamer<IN1, IN2, OUT> extends PythonStreamer<PythonDualInputSender<IN1,
IN2>, OUT> {
+
+	private static final long serialVersionUID = -607175070491761873L;
+
+	public PythonDualInputStreamer(AbstractRichFunction function, int envID, int setID, boolean
usesByteArray) {
+		super(function, envID, setID, usesByteArray, new PythonDualInputSender<IN1, IN2>());
+	}
+
+	/**
+	 * Sends all values contained in both iterators to the external process and collects all
results.
+	 *
+	 * @param iterator1 first input stream
+	 * @param iterator2 second input stream
+	 * @param c         collector
+	 * @throws IOException
+	 */
+	public final void streamBufferWithGroups(Iterator<IN1> iterator1, Iterator<IN2>
iterator2, Collector<OUT> c) throws IOException {
+		SingleElementPushBackIterator<IN1> i1 = new SingleElementPushBackIterator<>(iterator1);
+		SingleElementPushBackIterator<IN2> i2 = new SingleElementPushBackIterator<>(iterator2);
+		try {
+			int size;
+			if (i1.hasNext() || i2.hasNext()) {
+				while (true) {
+					int sig = in.readInt();
+					switch (sig) {
+						case SIGNAL_BUFFER_REQUEST_G0:
+							if (i1.hasNext()) {
+								size = sender.sendBuffer1(i1);
+								sendWriteNotification(size, i1.hasNext());
+							}
+							break;
+						case SIGNAL_BUFFER_REQUEST_G1:
+							if (i2.hasNext()) {
+								size = sender.sendBuffer2(i2);
+								sendWriteNotification(size, i2.hasNext());
+							}
+							break;
+						case SIGNAL_FINISHED:
+							return;
+						case SIGNAL_ERROR:
+							try {
+								outPrinter.join();
+							} catch (InterruptedException e) {
+								outPrinter.interrupt();
+							}
+							try {
+								errorPrinter.join();
+							} catch (InterruptedException e) {
+								errorPrinter.interrupt();
+							}
+							throw new RuntimeException(
+								"External process for task " + function.getRuntimeContext().getTaskName() + " terminated
prematurely due to an error." + msg);
+						default:
+							receiver.collectBuffer(c, sig);
+							sendReadConfirmation();
+							break;
+					}
+				}
+			}
+		} catch (SocketTimeoutException ignored) {
+			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName()
+ " stopped responding." + msg);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
index ba5d96a..838a261 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java
@@ -44,6 +44,8 @@ public class PythonReceiver<OUT> implements Serializable {
 	}
 
 	//=====Setup========================================================================================================
+
+	@SuppressWarnings("unchecked")
 	public void open(String path) throws IOException {
 		setupMappedFile(path);
 		deserializer = (Deserializer<OUT>) (readAsByteArray ? new ByteArrayDeserializer()
: new TupleDeserializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
index 8c40a6f..9ada758 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSender.java
@@ -12,6 +12,9 @@
  */
 package org.apache.flink.python.api.streaming.data;
 
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
@@ -19,16 +22,14 @@ import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Iterator;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
+
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_TMP_DATA_DIR;
 import static org.apache.flink.python.api.PythonPlanBinder.MAPPED_FILE_SIZE;
 
 /**
  * General-purpose class to write data to memory-mapped files.
  */
-public class PythonSender implements Serializable {
+public abstract class PythonSender implements Serializable {
 
 	private static final long serialVersionUID = -2004095650353962110L;
 
@@ -41,14 +42,8 @@ public class PythonSender implements Serializable {
 	private transient FileChannel outputChannel;
 	private transient MappedByteBuffer fileBuffer;
 
-	private transient ByteBuffer[] saved;
-
-	private transient Serializer[] serializer;
-
 	//=====Setup========================================================================================================
 	public void open(String path) throws IOException {
-		saved = new ByteBuffer[2];
-		serializer = new Serializer[2];
 		setupMappedFile(path);
 	}
 
@@ -80,88 +75,31 @@ public class PythonSender implements Serializable {
 		outputFile.delete();
 	}
 
-	/**
-	 * Resets this object to the post-configuration state.
-	 */
-	public void reset() {
-		serializer[0] = null;
-		serializer[1] = null;
-		fileBuffer.clear();
-	}
-
 	//=====IO===========================================================================================================
 	/**
-	 * Writes a single record to the memory-mapped file. This method does NOT take care of synchronization.
The user
-	 * must guarantee that the file may be written to before calling this method. This method
essentially reserves the
-	 * whole buffer for one record. As such it imposes some performance restrictions and should
only be used when
-	 * absolutely necessary.
-	 *
-	 * @param value record to send
-	 * @return size of the written buffer
-	 * @throws IOException
-	 */
-	@SuppressWarnings("unchecked")
-	public int sendRecord(Object value) throws IOException {
-		fileBuffer.clear();
-		int group = 0;
-
-		serializer[group] = getSerializer(value);
-		ByteBuffer bb = serializer[group].serialize(value);
-		if (bb.remaining() > MAPPED_FILE_SIZE) {
-			throw new RuntimeException("Serialized object does not fit into a single buffer.");
-		}
-		fileBuffer.put(bb);
-
-		int size = fileBuffer.position();
-
-		reset();
-		return size;
-	}
-
-	public boolean hasRemaining(int group) {
-		return saved[group] != null;
-	}
-
-	/**
 	 * Extracts records from an iterator and writes them to the memory-mapped file. This method
assumes that all values
 	 * in the iterator are of the same type. This method does NOT take care of synchronization.
The caller must
 	 * guarantee that the file may be written to before calling this method.
 	 *
-	 * @param i iterator containing records
-	 * @param group group to which the iterator belongs, most notably used by CoGroup-functions.
+	 * @param input     iterator containing records
+	 * @param serializer serializer for the input records
 	 * @return size of the written buffer
 	 * @throws IOException
 	 */
-	@SuppressWarnings("unchecked")
-	public int sendBuffer(Iterator<?> i, int group) throws IOException {
+	protected <IN> int sendBuffer(SingleElementPushBackIterator<IN> input, Serializer<IN>
serializer) throws IOException {
 		fileBuffer.clear();
 
-		Object value;
-		ByteBuffer bb;
-		if (serializer[group] == null) {
-			value = i.next();
-			serializer[group] = getSerializer(value);
-			bb = serializer[group].serialize(value);
-			if (bb.remaining() > MAPPED_FILE_SIZE) {
-				throw new RuntimeException("Serialized object does not fit into a single buffer.");
-			}
-			fileBuffer.put(bb);
-
-		}
-		if (saved[group] != null) {
-			fileBuffer.put(saved[group]);
-			saved[group] = null;
-		}
-		while (i.hasNext() && saved[group] == null) {
-			value = i.next();
-			bb = serializer[group].serialize(value);
+		while (input.hasNext()) {
+			IN value = input.next();
+			ByteBuffer bb = serializer.serialize(value);
 			if (bb.remaining() > MAPPED_FILE_SIZE) {
 				throw new RuntimeException("Serialized object does not fit into a single buffer.");
 			}
 			if (bb.remaining() <= fileBuffer.remaining()) {
 				fileBuffer.put(bb);
 			} else {
-				saved[group] = bb;
+				input.pushBack(value);
+				break;
 			}
 		}
 
@@ -170,20 +108,22 @@ public class PythonSender implements Serializable {
 	}
 
 	//=====Serializer===================================================================================================
-	private Serializer<?> getSerializer(Object value) {
+
+	@SuppressWarnings("unchecked")
+	protected <IN> Serializer<IN> getSerializer(IN value) {
 		if (value instanceof byte[]) {
-			return new ArraySerializer();
+			return (Serializer<IN>) new ArraySerializer();
 		}
 		if (((Tuple2<?, ?>) value).f0 instanceof byte[]) {
-			return new ValuePairSerializer();
+			return (Serializer<IN>) new ValuePairSerializer();
 		}
 		if (((Tuple2<?, ?>) value).f0 instanceof Tuple) {
-			return new KeyValuePairSerializer();
+			return (Serializer<IN>) new KeyValuePairSerializer();
 		}
 		throw new IllegalArgumentException("This object can't be serialized: " + value);
 	}
 
-	private abstract static class Serializer<T> {
+	protected abstract static class Serializer<T> {
 		protected ByteBuffer buffer;
 
 		public ByteBuffer serialize(T value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
new file mode 100644
index 0000000..42a1799
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputSender.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import java.io.IOException;
+
+/**
+ * This class is a {@link PythonSender} for operations with one input stream.
+ *
+ * @param <IN> input type
+ */
+public class PythonSingleInputSender<IN> extends PythonSender {
+
+	private static final long serialVersionUID = 614115041181108878L;
+
+	private transient Serializer<IN> serializer;
+
+	/**
+	 * Extracts records from an iterator and writes them to the memory-mapped file. This method
assumes that all values
+	 * in the iterator are of the same type. This method does NOT take care of synchronization.
The caller must
+	 * guarantee that the file may be written to before calling this method.
+	 *
+	 * @param input iterator containing records
+	 * @return size of the written buffer
+	 * @throws IOException
+	 */
+	public int sendBuffer(SingleElementPushBackIterator<IN> input) throws IOException
{
+		if (serializer == null) {
+			IN value = input.next();
+			serializer = getSerializer(value);
+			input.pushBack(value);
+		}
+		return sendBuffer(input, serializer);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
new file mode 100644
index 0000000..d013111
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonSingleInputStreamer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.Iterator;
+
+/**
+ * This class is a {@link PythonStreamer} for operations with one input stream.
+ * @param <IN> input type
+ * @param <OUT> output type
+ */
+public class PythonSingleInputStreamer<IN, OUT> extends PythonStreamer<PythonSingleInputSender<IN>,
OUT> {
+
+	private static final long serialVersionUID = -5149905918522069034L;
+
+	public PythonSingleInputStreamer(AbstractRichFunction function, int envID, int setID, boolean
usesByteArray) {
+		super(function, envID, setID, usesByteArray, new PythonSingleInputSender<IN>());
+	}
+
+	/**
+	 * Sends all values contained in the iterator to the external process and collects all results.
+	 *
+	 * @param iterator input stream
+	 * @param c        collector
+	 * @throws IOException
+	 */
+	public final void streamBufferWithoutGroups(Iterator<IN> iterator, Collector<OUT>
c) throws IOException {
+		SingleElementPushBackIterator<IN> i = new SingleElementPushBackIterator<>(iterator);
+		try {
+			int size;
+			if (i.hasNext()) {
+				while (true) {
+					int sig = in.readInt();
+					switch (sig) {
+						case SIGNAL_BUFFER_REQUEST:
+							if (i.hasNext()) {
+								size = sender.sendBuffer(i);
+								sendWriteNotification(size, i.hasNext());
+							} else {
+								throw new RuntimeException("External process requested data even though none is available.");
+							}
+							break;
+						case SIGNAL_FINISHED:
+							return;
+						case SIGNAL_ERROR:
+							try {
+								outPrinter.join();
+							} catch (InterruptedException e) {
+								outPrinter.interrupt();
+							}
+							try {
+								errorPrinter.join();
+							} catch (InterruptedException e) {
+								errorPrinter.interrupt();
+							}
+							throw new RuntimeException(
+								"External process for task " + function.getRuntimeContext().getTaskName() + " terminated
prematurely due to an error." + msg);
+						default:
+							receiver.collectBuffer(c, sig);
+							sendReadConfirmation();
+							break;
+					}
+				}
+			}
+		} catch (SocketTimeoutException ignored) {
+			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName()
+ " stopped responding." + msg);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 136bb69..830c843 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -19,7 +19,6 @@ import org.apache.flink.python.api.PythonPlanBinder;
 import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializer;
 import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
-import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,16 +44,16 @@ import static org.apache.flink.python.api.PythonPlanBinder.PLANBINDER_CONFIG_BCV
 /**
  * This streamer is used by functions to send/receive data to/from an external python process.
  */
-public class PythonStreamer<IN1, IN2, OUT> implements Serializable {
+public class PythonStreamer<S extends PythonSender, OUT> implements Serializable {
 	protected static final Logger LOG = LoggerFactory.getLogger(PythonStreamer.class);
 	private static final long serialVersionUID = -2342256613658373170L;
 
-	private static final int SIGNAL_BUFFER_REQUEST = 0;
-	private static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
-	private static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
-	private static final int SIGNAL_FINISHED = -1;
-	private static final int SIGNAL_ERROR = -2;
-	private static final byte SIGNAL_LAST = 32;
+	protected static final int SIGNAL_BUFFER_REQUEST = 0;
+	protected static final int SIGNAL_BUFFER_REQUEST_G0 = -3;
+	protected static final int SIGNAL_BUFFER_REQUEST_G1 = -4;
+	protected static final int SIGNAL_FINISHED = -1;
+	protected static final int SIGNAL_ERROR = -2;
+	protected static final byte SIGNAL_LAST = 32;
 
 	private final int envID;
 	private final int setID;
@@ -69,7 +68,7 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable
{
 	protected transient DataOutputStream out;
 	protected int port;
 
-	protected PythonSender sender;
+	protected S sender;
 	protected PythonReceiver<OUT> receiver;
 
 	protected StringBuilder msg = new StringBuilder();
@@ -79,14 +78,14 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable
{
 	protected transient Thread outPrinter;
 	protected transient Thread errorPrinter;
 
-	public PythonStreamer(AbstractRichFunction function, int envID, int setID, boolean usesByteArray)
{
+	public PythonStreamer(AbstractRichFunction function, int envID, int setID, boolean usesByteArray,
S sender) {
 		this.envID = envID;
 		this.setID = setID;
 		this.usePython3 = PythonPlanBinder.usePython3;
 		planArguments = PythonPlanBinder.arguments.toString();
-		sender = new PythonSender();
 		receiver = new PythonReceiver(usesByteArray);
 		this.function = function;
+		this.sender = sender;
 	}
 
 	/**
@@ -212,13 +211,13 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable
{
 		}
 	}
 
-	private void sendWriteNotification(int size, boolean hasNext) throws IOException {
+	protected void sendWriteNotification(int size, boolean hasNext) throws IOException {
 		out.writeInt(size);
 		out.writeByte(hasNext ? 0 : SIGNAL_LAST);
 		out.flush();
 	}
 
-	private void sendReadConfirmation() throws IOException {
+	protected void sendReadConfirmation() throws IOException {
 		out.writeByte(1);
 		out.flush();
 	}
@@ -257,107 +256,4 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable
{
 			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName()
+ " stopped responding." + msg);
 		}
 	}
-
-	/**
-	 * Sends all values contained in the iterator to the external process and collects all results.
-	 *
-	 * @param i iterator
-	 * @param c collector
-	 * @throws IOException
-	 */
-	public final void streamBufferWithoutGroups(Iterator<IN1> i, Collector<OUT>
c) throws IOException {
-		try {
-			int size;
-			if (i.hasNext()) {
-				while (true) {
-					int sig = in.readInt();
-					switch (sig) {
-						case SIGNAL_BUFFER_REQUEST:
-							if (i.hasNext() || sender.hasRemaining(0)) {
-								size = sender.sendBuffer(i, 0);
-								sendWriteNotification(size, sender.hasRemaining(0) || i.hasNext());
-							} else {
-								throw new RuntimeException("External process requested data even though none is available.");
-							}
-							break;
-						case SIGNAL_FINISHED:
-							return;
-						case SIGNAL_ERROR:
-							try {
-								outPrinter.join(1000);
-							} catch (InterruptedException e) {
-								outPrinter.interrupt();
-							}
-							try {
-								errorPrinter.join(1000);
-							} catch (InterruptedException e) {
-								errorPrinter.interrupt();
-							}
-							throw new RuntimeException(
-									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated
prematurely due to an error." + msg);
-						default:
-							receiver.collectBuffer(c, sig);
-							sendReadConfirmation();
-							break;
-					}
-				}
-			}
-		} catch (SocketTimeoutException ignored) {
-			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName()
+ " stopped responding." + msg);
-		}
-	}
-
-	/**
-	 * Sends all values contained in both iterators to the external process and collects all
results.
-	 *
-	 * @param i1 iterator
-	 * @param i2 iterator
-	 * @param c collector
-	 * @throws IOException
-	 */
-	public final void streamBufferWithGroups(Iterator<IN1> i1, Iterator<IN2> i2,
Collector<OUT> c) throws IOException {
-		try {
-			int size;
-			if (i1.hasNext() || i2.hasNext()) {
-				while (true) {
-					int sig = in.readInt();
-					switch (sig) {
-						case SIGNAL_BUFFER_REQUEST_G0:
-							if (i1.hasNext() || sender.hasRemaining(0)) {
-								size = sender.sendBuffer(i1, 0);
-								sendWriteNotification(size, sender.hasRemaining(0) || i1.hasNext());
-							}
-							break;
-						case SIGNAL_BUFFER_REQUEST_G1:
-							if (i2.hasNext() || sender.hasRemaining(1)) {
-								size = sender.sendBuffer(i2, 1);
-								sendWriteNotification(size, sender.hasRemaining(1) || i2.hasNext());
-							}
-							break;
-						case SIGNAL_FINISHED:
-							return;
-						case SIGNAL_ERROR:
-							try {
-								outPrinter.join(1000);
-							} catch (InterruptedException e) {
-								outPrinter.interrupt();
-							}
-							try {
-								errorPrinter.join(1000);
-							} catch (InterruptedException e) {
-								errorPrinter.interrupt();
-							}
-							throw new RuntimeException(
-									"External process for task " + function.getRuntimeContext().getTaskName() + " terminated
prematurely due to an error." + msg);
-						default:
-							receiver.collectBuffer(c, sig);
-							sendReadConfirmation();
-							break;
-					}
-				}
-			}
-		} catch (SocketTimeoutException ignored) {
-			throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName()
+ " stopped responding." + msg);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
new file mode 100644
index 0000000..ef80c98
--- /dev/null
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIterator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.Iterator;
+
+/**
+ * This class is a wrapper for an {@link Iterator} that supports pushing back a single record.
+ *
+ * @param <IN> input type
+ */
+class SingleElementPushBackIterator<IN> {
+
+	private IN pushBack;
+	private final Iterator<IN> iterator;
+
+	SingleElementPushBackIterator(Iterator<IN> iterator) {
+		this.pushBack = null;
+		this.iterator = iterator;
+	}
+
+	public boolean hasNext() {
+		return pushBack != null || iterator.hasNext();
+	}
+
+	public IN next() {
+		if (pushBack != null) {
+			IN obj = pushBack;
+			pushBack = null;
+			return obj;
+		} else {
+			return iterator.next();
+		}
+	}
+
+	public void pushBack(IN element) {
+		Preconditions.checkState(pushBack == null, "Already contains an element that was pushed
back. This indicates a programming error.");
+		pushBack = element;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7251c56/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
new file mode 100644
index 0000000..5e9eb42
--- /dev/null
+++ b/flink-libraries/flink-python/src/test/java/org/apache/flink/python/api/streaming/data/SingleElementPushBackIteratorTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.python.api.streaming.data;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+
+public class SingleElementPushBackIteratorTest {
+
+	@Test
+	public void testPushBackIterator() {
+		Collection<Integer> init = new ArrayList<>();
+		init.add(1);
+		init.add(2);
+		init.add(4);
+		init.add(5);
+		SingleElementPushBackIterator<Integer> iterator = new SingleElementPushBackIterator<>(init.iterator());
+
+		Assert.assertTrue(iterator.hasNext());
+		Assert.assertEquals(1, (int) iterator.next());
+
+		Assert.assertTrue(iterator.hasNext());
+		Assert.assertEquals(2, (int) iterator.next());
+
+		Assert.assertTrue(iterator.hasNext());
+		iterator.pushBack(3);
+		Assert.assertTrue(iterator.hasNext());
+		Assert.assertEquals(3, (int) iterator.next());
+
+		Assert.assertTrue(iterator.hasNext());
+		Assert.assertEquals(4, (int) iterator.next());
+
+		Assert.assertTrue(iterator.hasNext());
+		Assert.assertEquals(5, (int) iterator.next());
+
+		Assert.assertFalse(iterator.hasNext());
+		iterator.pushBack(6);
+		Assert.assertTrue(iterator.hasNext());
+		Assert.assertEquals(6, (int) iterator.next());
+
+		Assert.assertFalse(iterator.hasNext());
+	}
+
+	@Test
+	public void testSingleElementLimitation() {
+		Collection<Integer> init = Collections.emptyList();
+		SingleElementPushBackIterator<Integer> iterator = new SingleElementPushBackIterator<>(init.iterator());
+		Assert.assertFalse(iterator.hasNext());
+		iterator.pushBack(1);
+		try {
+			iterator.pushBack(2);
+			Assert.fail("Multiple elements could be pushed back.");
+		} catch (IllegalStateException ignored) {
+			// expected
+		}
+	}
+}


Mime
View raw message