flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [25/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Refactor, cleanup, and fix kafka consumers
Date Thu, 27 Aug 2015 11:25:42 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java
new file mode 100644
index 0000000..a3c35fc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/java/org/apache/flink/streaming/connectors/testutils/ValidatingExactlyOnceSink.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.testutils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import java.util.BitSet;
+
+public class ValidatingExactlyOnceSink implements SinkFunction<Integer>, Checkpointed<Tuple2<Integer,
BitSet>> {
+
+	private static final long serialVersionUID = 1748426382527469932L;
+	
+	private final int numElementsTotal;
+	
+	private BitSet duplicateChecker = new BitSet();  // this is checkpointed
+
+	private int numElements; // this is checkpointed
+
+	
+	public ValidatingExactlyOnceSink(int numElementsTotal) {
+		this.numElementsTotal = numElementsTotal;
+	}
+
+	
+	@Override
+	public void invoke(Integer value) throws Exception {
+		numElements++;
+		
+		if (duplicateChecker.get(value)) {
+			throw new Exception("Received a duplicate");
+		}
+		duplicateChecker.set(value);
+		
+		if (numElements == numElementsTotal) {
+			// validate
+			if (duplicateChecker.cardinality() != numElementsTotal) {
+				throw new Exception("Duplicate checker has wrong cardinality");
+			}
+			else if (duplicateChecker.nextClearBit(0) != numElementsTotal) {
+				throw new Exception("Received sparse sequence");
+			}
+			else {
+				throw new SuccessException();
+			}
+		}
+	}
+
+	@Override
+	public Tuple2<Integer, BitSet> snapshotState(long checkpointId, long checkpointTimestamp)
{
+		return new Tuple2<>(numElements, duplicateChecker);
+	}
+
+	@Override
+	public void restoreState(Tuple2<Integer, BitSet> state) {
+		this.numElements = state.f0;
+		this.duplicateChecker = state.f1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/resources/log4j-test.properties
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/resources/log4j-test.properties
index 9ede613..ded15e9 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/resources/log4j-test.properties
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=INFO, testlogger
+log4j.rootLogger=INFo, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err
@@ -24,4 +24,6 @@ log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
 # suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
+
+

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 8655e0c..8c1883e 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -54,6 +54,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TypeInfoParser;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.net.NetUtils;
@@ -91,7 +92,7 @@ import scala.collection.Seq;
  * <p/>
  * https://github.com/sakserv/hadoop-mini-clusters (ASL licensed)
  */
-
+@SuppressWarnings("serial")
 public class KafkaITCase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
@@ -377,7 +378,12 @@ public class KafkaITCase {
 
 	private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String
topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws
Exception {
 		LOG.info("Reading sequence for verification until final count {}", finalCount);
-		TestPersistentKafkaSource<Tuple2<Integer, Integer>> pks = new TestPersistentKafkaSource<Tuple2<Integer,
Integer>>(topicName, new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new
Tuple2<Integer, Integer>(1, 1), env.getConfig()), cc);
+
+		TypeInformation<Tuple2<Integer, Integer>> tuple2info = TypeInfoParser.parse("Tuple2<Integer,
Integer>");
+		
+		TestPersistentKafkaSource<Tuple2<Integer, Integer>> pks = new TestPersistentKafkaSource<>(topicName,

+				new TypeInformationSerializationSchema<>(tuple2info, env.getConfig()), cc);
+		
 		DataStream<Tuple2<Integer, Integer>> source = env.addSource(pks).map(new MapFunction<Tuple2<Integer,
Integer>, Tuple2<Integer, Integer>>() {
 			// we need to slow down the source so that it can participate in a few checkpoints.
 			// Otherwise it would write its data into buffers and shut down.
@@ -428,6 +434,9 @@ public class KafkaITCase {
 
 	private void writeSequence(StreamExecutionEnvironment env, String topicName, final int from,
final int to) throws Exception {
 		LOG.info("Writing sequence from {} to {} to topic {}", from, to, topicName);
+
+		TypeInformation<Tuple2<Integer, Integer>> tuple2info = TypeInfoParser.parse("Tuple2<Integer,
Integer>");
+		
 		DataStream<Tuple2<Integer, Integer>> stream = env.addSource(new RichParallelSourceFunction<Tuple2<Integer,
Integer>>() {
 			private static final long serialVersionUID = 1L;
 			boolean running = true;
@@ -457,7 +466,7 @@ public class KafkaITCase {
 		}).setParallelism(3);
 		stream.addSink(new KafkaSink<Tuple2<Integer, Integer>>(brokerConnectionStrings,
 				topicName,
-				new TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer,
Integer>(1, 1), env.getConfig()),
+				new TypeInformationSerializationSchema<>(tuple2info, env.getConfig()),
 				new T2Partitioner()
 		)).setParallelism(3);
 		env.execute("Write sequence from " + from + " to " + to + " to topic " + topicName);
@@ -472,6 +481,8 @@ public class KafkaITCase {
 			if(numPartitions != 3) {
 				throw new IllegalArgumentException("Expected three partitions");
 			}
+			
+			@SuppressWarnings("unchecked")
 			Tuple2<Integer, Integer> element = (Tuple2<Integer, Integer>) key;
 			return element.f0;
 		}
@@ -485,10 +496,14 @@ public class KafkaITCase {
 		String topic = "regularKafkaSourceTestTopic";
 		createTestTopic(topic, 1, 1);
 
+		TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long,
String>");
+
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		// add consuming topology:
 		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
-				new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup",
new TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long,
String>(1L, ""), env.getConfig()), 5000));
+				new KafkaSource<Tuple2<Long, String>>(zookeeperConnectionString, topic, "myFlinkGroup",
+						new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()), 5000));
+		
 		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
 			private static final long serialVersionUID = 1L;
 
@@ -545,7 +560,8 @@ public class KafkaITCase {
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings,
topic, new TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long,
String>(1L, ""), env.getConfig())));
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings,
topic, 
+				new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo,
env.getConfig())));
 
 		tryExecute(env, "regular kafka source test");
 
@@ -559,12 +575,14 @@ public class KafkaITCase {
 		String topic = "tupleTestTopic";
 		createTestTopic(topic, 1, 1);
 
+		TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long,
String>");
+		
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		// add consuming topology:
 		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
-				new PersistentKafkaSource<Tuple2<Long, String>>(topic,
-						new TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long,
String>(1L, ""), env.getConfig()),
+				new PersistentKafkaSource<>(topic,
+						new TypeInformationSerializationSchema<>(longStringInfo, env.getConfig()),
 						standardCC
 				));
 		consuming.addSink(new RichSinkFunction<Tuple2<Long, String>>() {
@@ -631,7 +649,8 @@ public class KafkaITCase {
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings,
topic, new TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long,
String>(1L, ""), env.getConfig())));
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings,
topic, 
+				new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo,
env.getConfig())));
 
 		tryExecute(env, "tupletesttopology");
 
@@ -653,10 +672,14 @@ public class KafkaITCase {
 		String topic = "bigRecordTestTopic";
 		createTestTopic(topic, 1, 1);
 
+		final TypeInformation<Tuple2<Long, byte[]>> longBytesInfo = TypeInfoParser.parse("Tuple2<Long,
byte[]>");
+		
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		// add consuming topology:
-		TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = new TypeInformationSerializationSchema<Tuple2<Long,
byte[]>>(new Tuple2<Long, byte[]>(0L, new byte[]{0}), env.getConfig());
+		TypeInformationSerializationSchema<Tuple2<Long, byte[]>> serSchema = 
+				new TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(longBytesInfo,
env.getConfig());
+		
 		Properties consumerProps = new Properties();
 		consumerProps.setProperty("fetch.message.max.bytes", Integer.toString(1024 * 1024 * 30));
 		consumerProps.setProperty("zookeeper.connect", zookeeperConnectionString);
@@ -734,7 +757,7 @@ public class KafkaITCase {
 		});
 
 		stream.addSink(new KafkaSink<Tuple2<Long, byte[]>>(brokerConnectionStrings,
topic,
-						new TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(new Tuple2<Long,
byte[]>(0L, new byte[]{0}), env.getConfig()))
+						new TypeInformationSerializationSchema<Tuple2<Long, byte[]>>(longBytesInfo,
env.getConfig()))
 		);
 
 		tryExecute(env, "big topology test");
@@ -751,12 +774,14 @@ public class KafkaITCase {
 
 		createTestTopic(topic, 3, 1);
 
+		final TypeInformation<Tuple2<Long, String>> longStringInfo = TypeInfoParser.parse("Tuple2<Long,
String>");
+		
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
 		// add consuming topology:
 		DataStreamSource<Tuple2<Long, String>> consuming = env.addSource(
 				new PersistentKafkaSource<Tuple2<Long, String>>(topic,
-						new TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long,
String>(1L, ""), env.getConfig()),
+						new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo,
env.getConfig()),
 						standardCC));
 		consuming.addSink(new SinkFunction<Tuple2<Long, String>>() {
 			private static final long serialVersionUID = 1L;
@@ -829,7 +854,8 @@ public class KafkaITCase {
 				running = false;
 			}
 		});
-		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings,
topic, new TypeInformationSerializationSchema<Tuple2<Long, String>>(new Tuple2<Long,
String>(1L, ""), env.getConfig()), new CustomPartitioner()));
+		stream.addSink(new KafkaSink<Tuple2<Long, String>>(brokerConnectionStrings,
topic,
+				new TypeInformationSerializationSchema<Tuple2<Long, String>>(longStringInfo,
env.getConfig()), new CustomPartitioner()));
 
 		tryExecute(env, "custom partitioning test");
 
@@ -1138,7 +1164,6 @@ public class KafkaITCase {
 
 	/**
 	 * Read topic to list, only using Kafka code.
-	 * @return
 	 */
 	private static List<MessageAndMetadata<byte[], byte[]>> readTopicToList(String
topicName, ConsumerConfig config, final int stopAfter) {
 		ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(config);
@@ -1172,7 +1197,7 @@ public class KafkaITCase {
 		return result;
 	}
 
-	private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema
deserializationSchema, int stopAfter){
+	private static void printTopic(String topicName, ConsumerConfig config, DeserializationSchema<?>
deserializationSchema, int stopAfter){
 		List<MessageAndMetadata<byte[], byte[]>> contents = readTopicToList(topicName,
config, stopAfter);
 		LOG.info("Printing contents of topic {} in consumer group {}", topicName, config.groupId());
 		for(MessageAndMetadata<byte[], byte[]> message: contents) {
@@ -1190,7 +1215,10 @@ public class KafkaITCase {
 		newProps.setProperty("zookeeper.connect", standardCC.zkConnect());
 
 		ConsumerConfig printerConfig = new ConsumerConfig(newProps);
-		DeserializationSchema deserializer = new TypeInformationSerializationSchema<Tuple2<Integer,
Integer>>(new Tuple2<Integer, Integer>(1,1), ec);
+		TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeInfoParser.parse("Tuple2<Integer,
Integer>");
+		
+		DeserializationSchema<Tuple2<Integer, Integer>> deserializer = 
+				new TypeInformationSerializationSchema<>(typeInfo, ec);
 		printTopic(topicName, printerConfig, deserializer, elements);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java
deleted file mode 100644
index 5c752ca..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.connectors.kafka.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class UtilsTest {
-
-	/**
-	 * Ensure that the returned byte array has the expected size
-	 */
-	@Test
-	public void testTypeInformationSerializationSchema() {
-		final ExecutionConfig ec = new ExecutionConfig();
-
-		Tuple2<Integer, Integer> test = new Tuple2<Integer, Integer>(1,666);
-
-		TypeInformationSerializationSchema<Tuple2<Integer, Integer>> ser = new TypeInformationSerializationSchema<Tuple2<Integer,
Integer>>(test, ec);
-
-		byte[] res = ser.serialize(test);
-		Assert.assertEquals(8, res.length);
-
-		Tuple2<Integer, Integer> another = ser.deserialize(res);
-		Assert.assertEquals(test.f0, another.f0);
-		Assert.assertEquals(test.f1, another.f1);
-	}
-
-	@Test
-	public void testGrowing() {
-		final ExecutionConfig ec = new ExecutionConfig();
-
-		Tuple2<Integer, byte[]> test1 = new Tuple2<Integer, byte[]>(1, new byte[16]);
-
-		TypeInformationSerializationSchema<Tuple2<Integer, byte[]>> ser = new TypeInformationSerializationSchema<Tuple2<Integer,
byte[]>>(test1, ec);
-
-		byte[] res = ser.serialize(test1);
-		Assert.assertEquals(24, res.length);
-		Tuple2<Integer, byte[]> another = ser.deserialize(res);
-		Assert.assertEquals(16, another.f1.length);
-
-		test1 = new Tuple2<Integer, byte[]>(1, new byte[26]);
-
-		res = ser.serialize(test1);
-		Assert.assertEquals(34, res.length);
-		another = ser.deserialize(res);
-		Assert.assertEquals(26, another.f1.length);
-
-		test1 = new Tuple2<Integer, byte[]>(1, new byte[1]);
-
-		res = ser.serialize(test1);
-		Assert.assertEquals(9, res.length);
-		another = ser.deserialize(res);
-		Assert.assertEquals(1, another.f1.length);
-	}
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 0bcdb74..f38c557 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
  * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally
until 
  * the blocks are released.</p>
  */
-public class BarrierBuffer implements CheckpointBarrierHandler {
+public class BarrierBuffer implements CheckpointBarrierHandler, Runnable {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
 	
@@ -78,6 +78,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private boolean endOfStream;
 
 
+	private int returnedBuffers;
+	
+	private int spilledBuffers;
+	
+	private int reReadBuffers;
+	
+	
+	private Thread debugPrinter;
+	
+	private volatile boolean printerRunning = true;
+	
 	/**
 	 * 
 	 * @param inputGate The input gate to draw the buffers and events from.
@@ -92,6 +103,10 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		
 		this.bufferSpiller = new BufferSpiller(ioManager, inputGate.getPageSize());
 		this.queuedBuffered = new ArrayDeque<BufferSpiller.SpilledBufferOrEventSequence>();
+		
+		this.debugPrinter = new Thread(this, "BB debugger");
+		this.debugPrinter.setDaemon(true);
+		this.debugPrinter.start();
 	}
 
 	// ------------------------------------------------------------------------
@@ -112,14 +127,21 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 					completeBufferedSequence();
 					return getNextNonBlocked();
 				}
+				else if (next.isBuffer()) {
+					reReadBuffers++;
+				}
 			}
 			
 			if (next != null) {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
 					bufferSpiller.add(next);
+					if (next.isBuffer()) {
+						spilledBuffers++;
+					}
 				}
 				else if (next.isBuffer()) {
+					returnedBuffers++;
 					return next;
 				}
 				else if (next.getEvent().getClass() == CheckpointBarrier.class) {
@@ -223,6 +245,9 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	@Override
 	public void cleanup() throws IOException {
+		printerRunning = false;
+		debugPrinter.interrupt();
+		
 		bufferSpiller.close();
 		if (currentBuffered != null) {
 			currentBuffered.cleanup();
@@ -318,4 +343,21 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		return String.format("last checkpoint: %d, current barriers: %d, closed channels: %d",
 				currentCheckpointId, numBarriersReceived, numClosedChannels);
 	}
+	
+	// -------------------------------------
+	// TEMP HACK for debugging
+	
+	public void run() {
+		while (printerRunning) {
+			try {
+				Thread.sleep(5000);
+			}
+			catch (InterruptedException e) {
+				// ignore
+			}
+			
+			LOG.info("=====================> BARRIER BUFFER: returned buffers: {}, spilled buffers:
{}, re-read buffers: {}",
+					returnedBuffers, spilledBuffers, reReadBuffers);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
index 9dd2b2d..6ff9712 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
@@ -50,17 +50,6 @@ public class TypeInformationSerializationSchema<T> implements DeserializationSch
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Creates a new de-/serialization schema for the type of the given object instance.
-	 * The type will be passed through the {@link TypeExtractor} to create its type information.
-	 *
-	 * @param type A sample type instance for which the type information will be created.
-	 * @param ec The execution config, which is used to parametrize the type serializers.
-	 */
-	public TypeInformationSerializationSchema(T type, ExecutionConfig ec) {
-		this(TypeExtractor.getForObject(type), ec);
-	}
-
-	/**
 	 * Creates a new de-/serialization schema for the given type.
 	 * 
 	 * @param typeInfo The type information for the type de-/serialized by this schema.

http://git-wip-us.apache.org/repos/asf/flink/blob/b9892a0e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
index 28b26cb..8c847d3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
@@ -19,11 +19,11 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
+
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -77,20 +77,6 @@ public class TypeInformationSerializationSchemaTest {
 			fail(e.getMessage());
 		}
 	}
-
-	@Test
-	public void testCreateFromType() {
-		try {
-			TypeInformationSerializationSchema<String> schema = 
-					new TypeInformationSerializationSchema<String>("", new ExecutionConfig());
-			
-			assertEquals(BasicTypeInfo.STRING_TYPE_INFO, schema.getProducedType());
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
 	
 	// ------------------------------------------------------------------------
 	//  Test data types


Mime
View raw message