flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [1/3] flink git commit: [FLINK-1625] [streaming] Refactored StreamVertex and subclasses to clean up after invoke and properly log and propagate exceptions
Date Wed, 04 Mar 2015 21:53:14 GMT
Repository: flink
Updated Branches:
  refs/heads/master e1e03062c -> 08ef02eba


[FLINK-1625] [streaming] Refactored StreamVertex and subclasses to clean up after invoke and
properly log and propagate exceptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3abd6c8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3abd6c8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3abd6c8f

Branch: refs/heads/master
Commit: 3abd6c8f4d45b1dc37a8d832a699a4d7fea34332
Parents: e1e0306
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Mar 3 11:49:37 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Wed Mar 4 22:38:06 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/collector/StreamOutput.java   |  4 ++
 .../api/streamvertex/CoStreamVertex.java        | 22 ++----
 .../api/streamvertex/InputHandler.java          |  6 ++
 .../api/streamvertex/OutputHandler.java         | 27 ++------
 .../api/streamvertex/StreamIterationHead.java   | 54 ++++++++-------
 .../api/streamvertex/StreamIterationTail.java   | 22 ++++--
 .../api/streamvertex/StreamVertex.java          | 70 ++++++++++++++------
 .../flink/streaming/io/CoRecordReader.java      | 58 ++++++++++------
 8 files changed, 157 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
index 4551c5a..a497119 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
@@ -83,4 +83,8 @@ public class StreamOutput<OUT> implements Collector<OUT> {
 		}
 	}
 
+	public void clearBuffers() {
+		output.clearBuffers();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index df7bcad..6957652 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.streamvertex;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.CoReaderIterator;
@@ -41,11 +40,9 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1,
OUT> {
 	CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>
coReader;
 	CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
 
-	private CoInvokable<IN1, IN2, OUT> userInvokable;
 	private static int numTasks;
 
 	public CoStreamVertex() {
-		userInvokable = null;
 		numTasks = newVertex();
 		instanceID = numTasks;
 	}
@@ -66,9 +63,9 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1,
OUT> {
 	}
 
 	@Override
-	protected void setInvokable() {
-		userInvokable = configuration.getUserInvokable(userClassLoader);
-		userInvokable.setup(this);
+	public void clearBuffers() {
+		outputHandler.clearWriters();
+		coReader.clearBuffers();
 	}
 
 	protected void setConfigInputs() throws StreamVertexException {
@@ -94,21 +91,16 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1,
OUT> {
 			}
 		}
 
-		final InputGate reader1 = inputList1.size() == 1 ? inputList1.get(0)
-				: new UnionInputGate(inputList1.toArray(new InputGate[inputList1.size()]));
+		final InputGate reader1 = inputList1.size() == 1 ? inputList1.get(0) : new UnionInputGate(
+				inputList1.toArray(new InputGate[inputList1.size()]));
 
-		final InputGate reader2 = inputList2.size() == 1 ? inputList2.get(0)
-				: new UnionInputGate(inputList2.toArray(new InputGate[inputList2.size()]));
+		final InputGate reader2 = inputList2.size() == 1 ? inputList2.get(0) : new UnionInputGate(
+				inputList2.toArray(new InputGate[inputList2.size()]));
 
 		coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>,
DeserializationDelegate<StreamRecord<IN2>>>(
 				reader1, reader2);
 	}
 
-	@Override
-	public void invoke() throws Exception {
-		outputHandler.invokeUserFunction("CO-TASK", userInvokable);
-	}
-
 	@SuppressWarnings("unchecked")
 	@Override
 	public <X> MutableObjectIterator<X> getInput(int index) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index 73dbfce..e8a2ce1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -90,4 +90,10 @@ public class InputHandler<IN> {
 	public IndexedReaderIterator<StreamRecord<IN>> getInputIter() {
 		return inputIter;
 	}
+
+	public void clearReaders() {
+		if (inputs != null) {
+			inputs.clearBuffers();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 911f060..359675d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -32,7 +32,6 @@ import org.apache.flink.streaming.api.collector.CollectorWrapper;
 import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper;
 import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
 import org.apache.flink.streaming.io.StreamRecordWriter;
@@ -166,8 +165,8 @@ public class OutputHandler<OUT> {
 	 *            The config of upStream task
 	 * @return
 	 */
-	private <T> StreamOutput<T> createStreamOutput(Integer outputVertex, StreamConfig
configuration,
-			int outputIndex) {
+	private <T> StreamOutput<T> createStreamOutput(Integer outputVertex,
+			StreamConfig configuration, int outputIndex) {
 
 		StreamRecordSerializer<T> outSerializer = configuration
 				.getTypeSerializerOut1(vertex.userClassLoader);
@@ -218,25 +217,9 @@ public class OutputHandler<OUT> {
 		}
 	}
 
-	public void invokeUserFunction(String componentTypeName, StreamInvokable<?, OUT> userInvokable)
-			throws IOException, InterruptedException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} {} invoked with instance id {}", componentTypeName, vertex.getName(),
-					vertex.getInstanceID());
+	public void clearWriters() {
+		for (StreamOutput<?> output : outputMap.values()) {
+			output.clearBuffers();
 		}
-
-		try {
-			vertex.invokeUserFunction(userInvokable);
-		} catch (Exception e) {
-			flushOutputs();
-			throw new RuntimeException(e);
-		}
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("{} {} invoke finished instance id {}", componentTypeName, vertex.getName(),
-					vertex.getInstanceID());
-		}
-
-		flushOutputs();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
index bc89241..adbdde3 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
@@ -17,8 +17,7 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
-import java.util.LinkedList;
-import java.util.List;
+import java.util.Collection;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -26,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
 import org.apache.flink.streaming.io.BlockingQueueBroker;
+import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,7 +33,7 @@ public class StreamIterationHead<OUT> extends StreamVertex<OUT,
OUT> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamIterationHead.class);
 
-	private OutputHandler<OUT> outputHandler;
+	private Collection<StreamOutput<?>> outputs;
 
 	private static int numSources;
 	private Integer iterationId;
@@ -52,6 +52,7 @@ public class StreamIterationHead<OUT> extends StreamVertex<OUT,
OUT> {
 	@Override
 	public void setInputsOutputs() {
 		outputHandler = new OutputHandler<OUT>(this);
+		outputs = outputHandler.getOutputs();
 
 		iterationId = configuration.getIterationId();
 		iterationWaitTime = configuration.getIterationWaitTime();
@@ -60,39 +61,46 @@ public class StreamIterationHead<OUT> extends StreamVertex<OUT,
OUT> {
 		try {
 			BlockingQueueBroker.instance().handIn(iterationId.toString(), dataChannel);
 		} catch (Exception e) {
-
+			throw new RuntimeException(e);
 		}
+
 	}
 
 	@SuppressWarnings("unchecked")
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SOURCE {} invoked with instance id {}", getName(), getInstanceID());
-		}
-
-		StreamRecord<OUT> nextRecord;
-
-		List<StreamOutput<OUT>> outputs = new LinkedList<StreamOutput<OUT>>();
-		for (StreamOutput<?> output : outputHandler.getOutputs()) {
-			outputs.add((StreamOutput<OUT>) output);
+			LOG.debug("Iteration source {} invoked with instance id {}", getName(), getInstanceID());
 		}
 
-		while (true) {
-			if (shouldWait) {
-				nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
-			} else {
-				nextRecord = dataChannel.take();
-			}
-			if (nextRecord == null) {
-				break;
+		try {
+			StreamRecord<OUT> nextRecord;
+
+			while (true) {
+				if (shouldWait) {
+					nextRecord = dataChannel.poll(iterationWaitTime, TimeUnit.MILLISECONDS);
+				} else {
+					nextRecord = dataChannel.take();
+				}
+				if (nextRecord == null) {
+					break;
+				}
+				for (StreamOutput<?> output : outputs) {
+					((StreamOutput<OUT>) output).collect(nextRecord.getObject());
+				}
 			}
-			for (StreamOutput<OUT> output : outputs) {
-				output.collect(nextRecord.getObject());
+
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Iteration source failed due to: {}", StringUtils.stringifyException(e));
 			}
+			throw e;
+		} finally {
+			// Cleanup
+			outputHandler.flushOutputs();
+			clearBuffers();
 		}
 
-		outputHandler.flushOutputs();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
index b3ecdf8..53e8750 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
@@ -59,13 +59,23 @@ public class StreamIterationTail<IN> extends StreamVertex<IN,
IN> {
 	@Override
 	public void invoke() throws Exception {
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK {} invoked", getName());
+			LOG.debug("Iteration sink {} invoked", getName());
 		}
 
-		forwardRecords();
+		try {
+			forwardRecords();
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("SINK {} invoke finished", getName());
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Iteration sink {} invoke finished", getName());
+			}
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("Iteration sink failed due to: {}", StringUtils.stringifyException(e));
+			}
+			throw e;
+		} finally {
+			// Cleanup
+			clearBuffers();
 		}
 	}
 
@@ -75,12 +85,11 @@ public class StreamIterationTail<IN> extends StreamVertex<IN,
IN> {
 			if (!pushToQueue(reuse)) {
 				break;
 			}
-			// TODO: Fix object reuse for iteration
 			reuse = inputHandler.getInputSerializer().createInstance();
 		}
 	}
 
-	private boolean pushToQueue(StreamRecord<IN> record) {
+	private boolean pushToQueue(StreamRecord<IN> record) throws InterruptedException {
 		try {
 			if (shouldWait) {
 				return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
@@ -92,6 +101,7 @@ public class StreamIterationTail<IN> extends StreamVertex<IN,
IN> {
 			if (LOG.isErrorEnabled()) {
 				LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
 						StringUtils.stringifyException(e));
+				throw e;
 			}
 			return false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
index 5033357..bd25e72 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamVertex.java
@@ -30,9 +30,14 @@ import org.apache.flink.streaming.io.IndexedReaderIterator;
 import org.apache.flink.streaming.state.OperatorState;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class StreamVertex<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>
{
 
+	private static final Logger LOG = LoggerFactory.getLogger(StreamVertex.class);
+
 	private static int numTasks;
 
 	protected StreamConfig configuration;
@@ -73,24 +78,6 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements
StreamTa
 		this.context = createRuntimeContext(getEnvironment().getTaskName(), this.states);
 	}
 
-	protected <T> void invokeUserFunction(StreamInvokable<?, T> userInvokable) throws
Exception {
-		userInvokable.setRuntimeContext(context);
-		userInvokable.open(getTaskConfiguration());
-
-		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-			invokable.setRuntimeContext(context);
-			invokable.open(getTaskConfiguration());
-		}
-
-		userInvokable.invoke();
-		userInvokable.close();
-
-		for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
-			invokable.close();
-		}
-
-	}
-
 	public void setInputsOutputs() {
 		inputHandler = new InputHandler<IN>(this);
 		outputHandler = new OutputHandler<OUT>(this);
@@ -118,7 +105,52 @@ public class StreamVertex<IN, OUT> extends AbstractInvokable implements
StreamTa
 
 	@Override
 	public void invoke() throws Exception {
-		outputHandler.invokeUserFunction("TASK", userInvokable);
+
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Task {} invoked with instance id {}", getName(), getInstanceID());
+		}
+
+		try {
+			userInvokable.setRuntimeContext(context);
+			userInvokable.open(getTaskConfiguration());
+
+			for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+				invokable.setRuntimeContext(context);
+				invokable.open(getTaskConfiguration());
+			}
+
+			userInvokable.invoke();
+
+			userInvokable.close();
+
+			for (ChainableInvokable<?, ?> invokable : outputHandler.chainedInvokables) {
+				invokable.close();
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Task {} invoke finished instance id {}", getName(), getInstanceID());
+			}
+
+		} catch (Exception e) {
+			if (LOG.isErrorEnabled()) {
+				LOG.error("StreamInvokable failed due to: {}", StringUtils.stringifyException(e));
+			}
+			throw e;
+		} finally {
+			// Cleanup
+			outputHandler.flushOutputs();
+			clearBuffers();
+		}
+
+	}
+
+	protected void clearBuffers() {
+		if (outputHandler != null) {
+			outputHandler.clearWriters();
+		}
+		if (inputHandler != null) {
+			inputHandler.clearReaders();
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/3abd6c8f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 84b08f7..bb20ecb 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -22,6 +22,7 @@ import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
@@ -36,7 +37,8 @@ import java.util.concurrent.LinkedBlockingQueue;
  * types to read records effectively.
  */
 @SuppressWarnings("rawtypes")
-public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable>
extends AbstractReader implements EventListener<InputGate> {
+public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable>
extends
+		AbstractReader implements EventListener<InputGate> {
 
 	private final InputGate bufferReader1;
 
@@ -63,8 +65,10 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends
IOReadable
 		this.bufferReader1 = bufferReader1;
 		this.bufferReader2 = bufferReader2;
 
-		this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader1.getNumberOfInputChannels()];
-		this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader2.getNumberOfInputChannels()];
+		this.reader1RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader1
+				.getNumberOfInputChannels()];
+		this.reader2RecordDeserializers = new AdaptiveSpanningRecordDeserializer[bufferReader2
+				.getNumberOfInputChannels()];
 
 		for (int i = 0; i < reader1RecordDeserializers.length; i++) {
 			reader1RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T1>();
@@ -104,7 +108,8 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends
IOReadable
 			if (currentReaderIndex == 1) {
 				while (true) {
 					if (reader1currentRecordDeserializer != null) {
-						RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer.getNextRecord(target1);
+						RecordDeserializer.DeserializationResult result = reader1currentRecordDeserializer
+								.getNextRecord(target1);
 
 						if (result.isBufferConsumed()) {
 							reader1currentRecordDeserializer.getCurrentBuffer().recycle();
@@ -116,27 +121,26 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends
IOReadable
 						if (result.isFullRecord()) {
 							return 1;
 						}
-					}
-					else {
+					} else {
 
 						final BufferOrEvent boe = bufferReader1.getNextBufferOrEvent();
 
 						if (boe.isBuffer()) {
-							reader1currentRecordDeserializer = reader1RecordDeserializers[boe.getChannelIndex()];
+							reader1currentRecordDeserializer = reader1RecordDeserializers[boe
+									.getChannelIndex()];
 							reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
-						}
-						else if (handleEvent(boe.getEvent())) {
+						} else if (handleEvent(boe.getEvent())) {
 							currentReaderIndex = 0;
 
 							break;
 						}
 					}
 				}
-			}
-			else if (currentReaderIndex == 2) {
+			} else if (currentReaderIndex == 2) {
 				while (true) {
 					if (reader2currentRecordDeserializer != null) {
-						RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer.getNextRecord(target2);
+						RecordDeserializer.DeserializationResult result = reader2currentRecordDeserializer
+								.getNextRecord(target2);
 
 						if (result.isBufferConsumed()) {
 							reader2currentRecordDeserializer.getCurrentBuffer().recycle();
@@ -148,23 +152,21 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends
IOReadable
 						if (result.isFullRecord()) {
 							return 2;
 						}
-					}
-					else {
+					} else {
 						final BufferOrEvent boe = bufferReader2.getNextBufferOrEvent();
 
 						if (boe.isBuffer()) {
-							reader2currentRecordDeserializer = reader2RecordDeserializers[boe.getChannelIndex()];
+							reader2currentRecordDeserializer = reader2RecordDeserializers[boe
+									.getChannelIndex()];
 							reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
-						}
-						else if (handleEvent(boe.getEvent())) {
+						} else if (handleEvent(boe.getEvent())) {
 							currentReaderIndex = 0;
 
 							break;
 						}
 					}
 				}
-			}
-			else {
+			} else {
 				throw new IllegalStateException("Bug: unexpected current reader index.");
 			}
 		}
@@ -182,9 +184,23 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends
IOReadable
 	public void onEvent(InputGate bufferReader) {
 		if (bufferReader == bufferReader1) {
 			availableRecordReaders.add(1);
-		}
-		else if (bufferReader == bufferReader2) {
+		} else if (bufferReader == bufferReader2) {
 			availableRecordReaders.add(2);
 		}
 	}
+
+	public void clearBuffers() {
+		for (RecordDeserializer<?> deserializer : reader1RecordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+		for (RecordDeserializer<?> deserializer : reader2RecordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+	}
 }


Mime
View raw message