flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-2004] Fix memory leak in presence of failed checkpoints in Kafka source
Date Mon, 01 Jun 2015 15:58:11 GMT
Repository: flink
Updated Branches:
  refs/heads/master ac43a69c1 -> 033409190


[FLINK-2004] Fix memory leak in presence of failed checkpoints in Kafka source

This closes #674


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7d294735
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7d294735
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7d294735

Branch: refs/heads/master
Commit: 7d294735eb3a77debbe2215c44dea4f97bcf7165
Parents: ac43a69
Author: Robert Metzger <rmetzger@apache.org>
Authored: Thu May 14 11:45:30 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Jun 1 17:00:02 2015 +0200

----------------------------------------------------------------------
 .../api/persistent/PersistentKafkaSource.java   | 77 +++++++++++++-----
 .../streaming/connectors/kafka/KafkaITCase.java | 86 +++++++++++++++++++-
 .../streaming/api/checkpoint/Checkpointed.java  |  4 +-
 3 files changed, 140 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7d294735/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
index 032ed08..84fd7b6 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java
@@ -30,6 +30,7 @@ import kafka.utils.ZkUtils;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
 import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.commons.collections.map.LinkedMap;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
@@ -50,7 +51,6 @@ import java.io.ObjectOutputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -65,20 +65,26 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 		ResultTypeQueryable<OUT>,
 		CheckpointCommitter,
 		CheckpointedAsynchronously<long[]> {
+
+	private static final long serialVersionUID = 287845877188312621L;
+	
 	private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
 
+	
+	private final String topicName;
+	private final DeserializationSchema<OUT> deserializationSchema;
+	
 	protected transient ConsumerConfig consumerConfig;
 	private transient ConsumerIterator<byte[], byte[]> iteratorToRead;
 	private transient ConsumerConnector consumer;
-
-	private String topicName;
-	private DeserializationSchema<OUT> deserializationSchema;
-	private boolean running = true;
-
-	private transient long[] lastOffsets;
+	
 	private transient ZkClient zkClient;
+	private transient long[] lastOffsets;
 	private transient long[] commitedOffsets; // maintain committed offsets, to avoid committing
the same over and over again.
-
+	private transient long[] restoreState;
+	
+	private final LinkedMap pendingCheckpoints = new LinkedMap();
+	
 	// We set this in reachedEnd to carry it over to next()
 	private OUT nextElement = null;
 
@@ -133,19 +139,33 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 		this.consumer = consumer;
 
 		zkClient = new ZkClient(consumerConfig.zkConnect(),
-			consumerConfig.zkSessionTimeoutMs(),
-			consumerConfig.zkConnectionTimeoutMs(),
-			new KafkaZKStringSerializer());
+				consumerConfig.zkSessionTimeoutMs(),
+				consumerConfig.zkConnectionTimeoutMs(),
+				new KafkaZKStringSerializer());
 
 		// most likely the number of offsets we're going to store here will be lower than the number
of partitions.
 		int numPartitions = getNumberOfPartitions();
 		LOG.debug("The topic {} has {} partitions", topicName, numPartitions);
 		this.lastOffsets = new long[numPartitions];
 		this.commitedOffsets = new long[numPartitions];
-		Arrays.fill(this.lastOffsets, -1);
-		Arrays.fill(this.commitedOffsets, 0); // just to make it clear
+		// check if there are offsets to restore
+		if (restoreState != null) {
+			if (restoreState.length != numPartitions) {
+				throw new IllegalStateException("There are "+restoreState.length+" offsets to restore
for topic "+topicName+" but " +
+						"there are only "+numPartitions+" in the topic");
+			}
 
+			LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(restoreState));
+			setOffsetsInZooKeeper(restoreState);
+			this.lastOffsets = restoreState;
+		} else {
+			// initialize empty offsets
+			Arrays.fill(this.lastOffsets, -1);
+		}
+		Arrays.fill(this.commitedOffsets, 0); // just to make it clear
+		
 		nextElement = null;
+		pendingCheckpoints.clear();
 	}
 
 	@Override
@@ -200,10 +220,9 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 	// ---------------------- State / Checkpoint handling  -----------------
 	// this source is keeping the partition offsets in Zookeeper
 
-	private Map<Long, long[]> pendingCheckpoints = new HashMap<Long, long[]>();
-
 	@Override
 	public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception
{
+
 		if(lastOffsets == null) {
 			LOG.warn("State snapshot requested on not yet opened source. Returning null");
 			return null;
@@ -217,7 +236,8 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 
 	@Override
 	public void restoreState(long[] state) {
-		// we maintain the offsets in Kafka, so nothing to do.
+		LOG.info("The state will be restored to {} in the open() method", Arrays.toString(state));
+		this.restoreState = Arrays.copyOf(state, state.length);
 	}
 
 
@@ -228,15 +248,28 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 	@Override
 	public void commitCheckpoint(long checkpointId) {
 		LOG.info("Commit checkpoint {}", checkpointId);
-		long[] checkpointOffsets = pendingCheckpoints.remove(checkpointId);
-		if(checkpointOffsets == null) {
+		final int posInMap = pendingCheckpoints.indexOf(checkpointId);
+		if(posInMap == -1) {
 			LOG.warn("Unable to find pending checkpoint for id {}", checkpointId);
 			return;
 		}
-		LOG.info("Got corresponding offsets {}", Arrays.toString(checkpointOffsets));
 
-		for(int partition = 0; partition < checkpointOffsets.length; partition++) {
-			long offset = checkpointOffsets[partition];
+		long[] checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap);
+		LOG.info("Committing offsets {} to ZooKeeper", Arrays.toString(checkpointOffsets));
+
+		// remove older checkpoints in map:
+		if(!pendingCheckpoints.isEmpty()) {
+			for(int i = 0; i < posInMap; i++) {
+				pendingCheckpoints.remove(0);
+			}
+		}
+
+		setOffsetsInZooKeeper(checkpointOffsets);
+	}
+
+	private void setOffsetsInZooKeeper(long[] offsets) {
+		for(int partition = 0; partition < offsets.length; partition++) {
+			long offset = offsets[partition];
 			if(offset != -1) {
 				setOffset(partition, offset);
 			}
@@ -335,4 +368,4 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT>
 			}
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/7d294735/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 7b7bdcc..51682ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -18,10 +18,10 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -44,10 +44,12 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import kafka.network.SocketServer;
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.collections.map.LinkedMap;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -181,6 +183,82 @@ public class KafkaITCase {
 		zkClient.close();
 	}
 
+	// --------------------------  test checkpointing ------------------------
+	@Test
+	public void testCheckpointing() throws Exception {
+		createTestTopic("testCheckpointing", 1, 1);
+
+		Properties props = new Properties();
+		props.setProperty("zookeeper.connect", zookeeperConnectionString);
+		props.setProperty("group.id", "testCheckpointing");
+		props.setProperty("auto.commit.enable", "false");
+		ConsumerConfig cc = new ConsumerConfig(props);
+		PersistentKafkaSource<String> source = new PersistentKafkaSource<String>("testCheckpointing",
new FakeDeserializationSchema(), cc);
+
+
+		Field pendingCheckpointsField = PersistentKafkaSource.class.getDeclaredField("pendingCheckpoints");
+		pendingCheckpointsField.setAccessible(true);
+		LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source);
+
+
+		Assert.assertEquals(0, pendingCheckpoints.size());
+		// first restore
+		source.restoreState(new long[]{1337});
+		// then open
+		source.open(new Configuration());
+		long[] state1 = source.snapshotState(1, 15);
+		Assert.assertArrayEquals(new long[]{1337}, state1);
+		long[] state2 = source.snapshotState(2, 30);
+		Assert.assertArrayEquals(new long[]{1337}, state2);
+		Assert.assertEquals(2, pendingCheckpoints.size());
+
+		source.commitCheckpoint(1);
+		Assert.assertEquals(1, pendingCheckpoints.size());
+
+		source.commitCheckpoint(2);
+		Assert.assertEquals(0, pendingCheckpoints.size());
+
+		source.commitCheckpoint(666); // invalid checkpoint
+		Assert.assertEquals(0, pendingCheckpoints.size());
+
+		// create 500 snapshots
+		for(int i = 0; i < 500; i++) {
+			source.snapshotState(i, 15 * i);
+		}
+		Assert.assertEquals(500, pendingCheckpoints.size());
+
+		// commit only the second last
+		source.commitCheckpoint(498);
+		Assert.assertEquals(1, pendingCheckpoints.size());
+
+		// access invalid checkpoint
+		source.commitCheckpoint(490);
+
+		// and the last
+		source.commitCheckpoint(499);
+		Assert.assertEquals(0, pendingCheckpoints.size());
+	}
+
+	private static class FakeDeserializationSchema implements DeserializationSchema<String>
{
+
+		@Override
+		public String deserialize(byte[] message) {
+			return null;
+		}
+
+		@Override
+		public boolean isEndOfStream(String nextElement) {
+			return false;
+		}
+
+		@Override
+		public TypeInformation<String> getProducedType() {
+			return null;
+		}
+	}
+
+	// ---------------------------------------------------------------
+
 
 	@Test
 	public void testOffsetManipulation() {
@@ -234,9 +312,9 @@ public class KafkaITCase {
 		long o1 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0);
 		long o2 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1);
 		long o3 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2);
-		Assert.assertTrue("The offset seems incorrect, got "+o1, o1 > 50L);
-		Assert.assertTrue("The offset seems incorrect, got "+o2, o2 > 50L);
-		Assert.assertTrue("The offset seems incorrect, got "+o3, o3 > 50L);
+		Assert.assertTrue("The offset seems incorrect, got " + o1, o1 > 50L);
+		Assert.assertTrue("The offset seems incorrect, got " + o2, o2 > 50L);
+		Assert.assertTrue("The offset seems incorrect, got " + o3, o3 > 50L);
 		/** Once we have proper shutdown of streaming jobs, enable these tests
 		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk,
standardCC.groupId(), topicName, 0));
 		Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk,
standardCC.groupId(), topicName, 1));

http://git-wip-us.apache.org/repos/asf/flink/blob/7d294735/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
index 2cab7a3..cb49dba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java
@@ -56,7 +56,9 @@ public interface Checkpointed<T extends Serializable> {
 	/**
 	 * Restores the state of the function or operator to that of a previous checkpoint.
 	 * This method is invoked when a function is executed as part of a recovery run.
-	 * 	 * 
+	 *
+	 * Note that restoreState() is called before open().
+	 *
 	 * @param state The state to be restored. 
 	 */
 	void restoreState(T state);


Mime
View raw message