flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [6/9] flink git commit: [FLINK-3058] Add support for Kafka 0.9.0.0
Date Wed, 20 Jan 2016 19:31:51 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
new file mode 100644
index 0000000..1168b27
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartition.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A serializable representation of a kafka topic and a partition.
+ * Used as an operator state for the Kafka consumer
+ */
+public class KafkaTopicPartition implements Serializable {
+
+	private static final long serialVersionUID = 722083576322742325L;
+
+	private final String topic;
+	private final int partition;
+	private final int cachedHash;
+
+	public KafkaTopicPartition(String topic, int partition) {
+		this.topic = checkNotNull(topic);
+		this.partition = partition;
+		this.cachedHash = 31 * topic.hashCode() + partition;
+	}
+
+	public String getTopic() {
+		return topic;
+	}
+
+	public int getPartition() {
+		return partition;
+	}
+
+	@Override
+	public String toString() {
+		return "KafkaTopicPartition{" +
+				"topic='" + topic + '\'' +
+				", partition=" + partition +
+				'}';
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof KafkaTopicPartition)) {
+			return false;
+		}
+
+		KafkaTopicPartition that = (KafkaTopicPartition) o;
+
+		if (partition != that.partition) {
+			return false;
+		}
+		return topic.equals(that.topic);
+	}
+
+	@Override
+	public int hashCode() {
+		return cachedHash;
+	}
+
+
+	// ------------------- Utilities -------------------------------------
+
+	/**
+	 * Returns a unique list of topics from the topic partition map
+	 *
+	 * @param topicPartitionMap A map of KafkaTopicPartition's
+	 * @return A unique list of topics from the input map
+	 */
+	public static List<String> getTopics(Map<KafkaTopicPartition, ?> topicPartitionMap) {
+		HashSet<String> uniqueTopics = new HashSet<>();
+		for (KafkaTopicPartition ktp: topicPartitionMap.keySet()) {
+			uniqueTopics.add(ktp.getTopic());
+		}
+		return new ArrayList<>(uniqueTopics);
+	}
+
+	public static String toString(Map<KafkaTopicPartition, Long> map) {
+		StringBuilder sb = new StringBuilder();
+		for (Map.Entry<KafkaTopicPartition, Long> p: map.entrySet()) {
+			KafkaTopicPartition ktp = p.getKey();
+			sb.append(ktp.getTopic()).append(":").append(ktp.getPartition()).append("=").append(p.getValue()).append(", ");
+		}
+		return sb.toString();
+	}
+
+	/**
+	 * Checks whether this partition is contained in the map with KafkaTopicPartitionLeaders
+	 *
+	 * @param map The map of KafkaTopicPartitionLeaders
+	 * @return true if the element is contained.
+	 */
+	public boolean isContained(Map<KafkaTopicPartitionLeader, ?> map) {
+		for(Map.Entry<KafkaTopicPartitionLeader, ?> entry : map.entrySet()) {
+			if(entry.getKey().getTopicPartition().equals(this)) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	public static List<KafkaTopicPartition> convertToPartitionInfo(List<KafkaTopicPartitionLeader> partitionInfos) {
+		List<KafkaTopicPartition> ret = new ArrayList<>(partitionInfos.size());
+		for(KafkaTopicPartitionLeader ktpl: partitionInfos) {
+			ret.add(ktpl.getTopicPartition());
+		}
+		return ret;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
new file mode 100644
index 0000000..8dd9a52
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/KafkaTopicPartitionLeader.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.kafka.common.Node;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serializable Topic Partition info with leader Node information.
+ * This class is used at runtime.
+ */
+public class KafkaTopicPartitionLeader implements Serializable {
+
+	private static final long serialVersionUID = 9145855900303748582L;
+
+	private final int leaderId;
+	private final int leaderPort;
+	private final String leaderHost;
+	private final KafkaTopicPartition topicPartition;
+	private final int cachedHash;
+
+	public KafkaTopicPartitionLeader(KafkaTopicPartition topicPartition, Node leader) {
+		this.topicPartition = topicPartition;
+		if (leader == null) {
+			this.leaderId = -1;
+			this.leaderHost = null;
+			this.leaderPort = -1;
+		} else {
+			this.leaderId = leader.id();
+			this.leaderPort = leader.port();
+			this.leaderHost = leader.host();
+		}
+		int cachedHash = (leader == null) ? 14 : leader.hashCode();
+		this.cachedHash = 31 * cachedHash + topicPartition.hashCode();
+	}
+
+	public KafkaTopicPartition getTopicPartition() {
+		return topicPartition;
+	}
+
+	public Node getLeader() {
+		if (this.leaderId == -1) {
+			return null;
+		} else {
+			return new Node(leaderId, leaderHost, leaderPort);
+		}
+	}
+
+	public static Object toString(List<KafkaTopicPartitionLeader> partitions) {
+		StringBuilder sb = new StringBuilder();
+		for (KafkaTopicPartitionLeader p: partitions) {
+			sb.append(p.getTopicPartition().getTopic()).append(":").append(p.getTopicPartition().getPartition()).append(", ");
+		}
+		return sb.toString();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof KafkaTopicPartitionLeader)) {
+			return false;
+		}
+
+		KafkaTopicPartitionLeader that = (KafkaTopicPartitionLeader) o;
+
+		if (!topicPartition.equals(that.topicPartition)) {
+			return false;
+		}
+		return leaderId == that.leaderId && leaderPort == that.leaderPort && leaderHost.equals(that.leaderHost);
+	}
+
+	@Override
+	public int hashCode() {
+		return cachedHash;
+	}
+
+	@Override
+	public String toString() {
+		return "KafkaTopicPartitionLeader{" +
+				"leaderId=" + leaderId +
+				", leaderPort=" + leaderPort +
+				", leaderHost='" + leaderHost + '\'' +
+				", topic=" + topicPartition.getTopic() +
+				", partition=" + topicPartition.getPartition() +
+				'}';
+	}
+
+
+	/**
+	 * Replaces an existing KafkaTopicPartition ignoring the leader in the given map.
+	 *
+	 * @param newKey new topicpartition
+	 * @param newValue new offset
+	 * @param map map to do the search in
+	 * @return oldValue the old value (offset)
+	 */
+	public static Long replaceIgnoringLeader(KafkaTopicPartitionLeader newKey, Long newValue, Map<KafkaTopicPartitionLeader, Long> map) {
+		for(Map.Entry<KafkaTopicPartitionLeader, Long> entry: map.entrySet()) {
+			if(entry.getKey().getTopicPartition().equals(newKey.getTopicPartition())) {
+				Long oldValue = map.remove(entry.getKey());
+				if(map.put(newKey, newValue) != null) {
+					throw new IllegalStateException("Key was not removed before");
+				}
+				return oldValue;
+			}
+		}
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
new file mode 100644
index 0000000..001b6cb
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ZooKeeperStringSerializer.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * Simple ZooKeeper serializer for Strings.
+ */
+public class ZooKeeperStringSerializer implements ZkSerializer {
+
+	private static final Charset CHARSET = Charset.forName("UTF-8");
+	
+	@Override
+	public byte[] serialize(Object data) {
+		if (data instanceof String) {
+			return ((String) data).getBytes(CHARSET);
+		}
+		else {
+			throw new IllegalArgumentException("ZooKeeperStringSerializer can only serialize strings.");
+		}
+	}
+
+	@Override
+	public Object deserialize(byte[] bytes) {
+		if (bytes == null) {
+			return null;
+		}
+		else {
+			return new String(bytes, CHARSET);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java
new file mode 100644
index 0000000..a038711
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/AvgKafkaMetricAccumulator.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.SampledStat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+import java.util.List;
+
+public class AvgKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator {
+	private static final Logger LOG = LoggerFactory.getLogger(AvgKafkaMetricAccumulator.class);
+
+	/** The last sum/count before the serialization  **/
+	private AvgSumCount lastSumCount;
+
+	public AvgKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
+		super(kafkaMetric);
+	}
+
+	@Override
+	public void merge(Accumulator<Void, Double> other) {
+		if(!(other instanceof AvgKafkaMetricAccumulator)) {
+			throw new RuntimeException("Trying to merge incompatible accumulators: "+this+" with "+other);
+		}
+		AvgKafkaMetricAccumulator otherMetric = (AvgKafkaMetricAccumulator) other;
+
+		AvgSumCount thisAvg;
+		if(this.lastSumCount == null) {
+			Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
+			if (!(thisMeasurable instanceof Avg)) {
+				throw new RuntimeException("Must be of type Avg");
+			}
+			thisAvg = getAvgSumCount((Avg) thisMeasurable);
+		} else {
+			thisAvg = this.lastSumCount;
+		}
+
+		AvgSumCount otherAvg;
+		if(otherMetric.lastSumCount == null) {
+			Measurable otherMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(otherMetric.kafkaMetric);
+			if(!(otherMeasurable instanceof Avg) ) {
+				throw new RuntimeException("Must be of type Avg");
+			}
+			otherAvg = getAvgSumCount((Avg) otherMeasurable);
+		} else {
+			otherAvg = otherMetric.lastSumCount;
+		}
+
+		thisAvg.count += otherAvg.count;
+		thisAvg.sum += otherAvg.sum;
+		this.mergedValue = thisAvg.sum / thisAvg.count;
+	}
+
+	@Override
+	public Accumulator<Void, Double> clone() {
+		AvgKafkaMetricAccumulator clone = new AvgKafkaMetricAccumulator(kafkaMetric);
+		clone.lastSumCount = this.lastSumCount;
+		clone.isMerged = this.isMerged;
+		clone.mergedValue = this.mergedValue;
+		return clone;
+	}
+
+	// ------------ Utilities
+
+	private static class AvgSumCount implements Serializable {
+		double sum;
+		long count;
+
+		@Override
+		public String toString() {
+			return "AvgSumCount{" +
+					"sum=" + sum +
+					", count=" + count +
+					", avg="+(sum/count)+"}";
+		}
+	}
+
+	/**
+	 * Extracts sum and count from Avg using reflection
+	 *
+	 * @param avg Avg SampledStat from Kafka
+	 * @return A KV pair with the average's sum and count
+	 */
+	private static AvgSumCount getAvgSumCount(Avg avg) {
+		try {
+			Field samplesField = SampledStat.class.getDeclaredField("samples");
+			Field sampleValue = Class.forName("org.apache.kafka.common.metrics.stats.SampledStat$Sample").getDeclaredField("value");
+			Field sampleEventCount = Class.forName("org.apache.kafka.common.metrics.stats.SampledStat$Sample").getDeclaredField("eventCount");
+			samplesField.setAccessible(true);
+			sampleValue.setAccessible(true);
+			sampleEventCount.setAccessible(true);
+			List samples = (List) samplesField.get(avg);
+			AvgSumCount res = new AvgSumCount();
+			for(int i = 0; i < samples.size(); i++) {
+				res.sum += (double)sampleValue.get(samples.get(i));
+				res.count += (long)sampleEventCount.get(samples.get(i));
+			}
+			return res;
+		} catch(Throwable t) {
+			throw new RuntimeException("Unable to extract sum and count from Avg using reflection. " +
+					"You can turn off the metrics from Flink's Kafka connector if this issue persists.", t);
+		}
+	}
+
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		Measurable thisMeasurable = DefaultKafkaMetricAccumulator.getMeasurableFromKafkaMetric(this.kafkaMetric);
+		if(!(thisMeasurable instanceof Avg) ) {
+			throw new RuntimeException("Must be of type Avg");
+		}
+		this.lastSumCount = getAvgSumCount((Avg) thisMeasurable);
+		out.defaultWriteObject();
+	}
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java
new file mode 100644
index 0000000..06b7930
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/DefaultKafkaMetricAccumulator.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.lang.reflect.Field;
+
+public class DefaultKafkaMetricAccumulator implements Accumulator<Void, Double>, Serializable {
+
+	private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaMetricAccumulator.class);
+
+	protected boolean isMerged = false;
+	protected double mergedValue;
+	protected transient KafkaMetric kafkaMetric;
+
+
+	public static DefaultKafkaMetricAccumulator createFor(Metric metric) {
+		if(!(metric instanceof KafkaMetric)) {
+			return null;
+		}
+		KafkaMetric kafkaMetric = (KafkaMetric) metric;
+		Measurable measurable = getMeasurableFromKafkaMetric(kafkaMetric);
+		if(measurable == null) {
+			return null;
+		}
+		if (measurable instanceof Max) {
+			return new MaxKafkaMetricAccumulator(kafkaMetric);
+		} else if (measurable instanceof Min) {
+			return new MinKafkaMetricAccumulator(kafkaMetric);
+		} else if (measurable instanceof Avg) {
+			return new AvgKafkaMetricAccumulator(kafkaMetric);
+		} else {
+			// fallback accumulator. works for Rate, Total, Count.
+			return new DefaultKafkaMetricAccumulator(kafkaMetric);
+		}
+	}
+
+	/**
+	 * This utility method is using reflection to get the Measurable from the KafkaMetric.
+	 * Since Kafka 0.9, Kafka is exposing the Measurable properly, but Kafka 0.8.2 does not yet expose it.
+	 *
+	 * @param kafkaMetric the metric to extract the field form
+	 * @return Measurable type (or null in case of an error)
+	 */
+	protected static Measurable getMeasurableFromKafkaMetric(KafkaMetric kafkaMetric) {
+		try {
+			Field measurableField = kafkaMetric.getClass().getDeclaredField("measurable");
+			measurableField.setAccessible(true);
+			return (Measurable) measurableField.get(kafkaMetric);
+		} catch (Throwable e) {
+			LOG.warn("Unable to initialize Kafka metric: " + kafkaMetric, e);
+			return null;
+		}
+	}
+
+
+	DefaultKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
+		this.kafkaMetric = kafkaMetric;
+	}
+
+	@Override
+	public void add(Void value) {
+		// noop
+	}
+
+	@Override
+	public Double getLocalValue() {
+		if(isMerged && kafkaMetric == null) {
+			return mergedValue;
+		}
+		return kafkaMetric.value();
+	}
+
+	@Override
+	public void resetLocal() {
+		// noop
+	}
+
+	@Override
+	public void merge(Accumulator<Void, Double> other) {
+		if(!(other instanceof DefaultKafkaMetricAccumulator)) {
+			throw new RuntimeException("Trying to merge incompatible accumulators");
+		}
+		DefaultKafkaMetricAccumulator otherMetric = (DefaultKafkaMetricAccumulator) other;
+		if(this.isMerged) {
+			if(otherMetric.isMerged) {
+				this.mergedValue += otherMetric.mergedValue;
+			} else {
+				this.mergedValue += otherMetric.getLocalValue();
+			}
+		} else {
+			this.isMerged = true;
+			if(otherMetric.isMerged) {
+				this.mergedValue = this.getLocalValue() + otherMetric.mergedValue;
+			} else {
+				this.mergedValue = this.getLocalValue() + otherMetric.getLocalValue();
+			}
+
+		}
+	}
+
+	@Override
+	public Accumulator<Void, Double> clone() {
+		DefaultKafkaMetricAccumulator clone = new DefaultKafkaMetricAccumulator(this.kafkaMetric);
+		clone.isMerged = this.isMerged;
+		clone.mergedValue = this.mergedValue;
+		return clone;
+	}
+
+	@Override
+	public String toString() {
+		if(isMerged) {
+			return Double.toString(mergedValue);
+		}
+		if(kafkaMetric == null) {
+			return "null";
+		}
+		return Double.toString(kafkaMetric.value());
+	}
+
+	// -------- custom serialization methods
+	private void writeObject(ObjectOutputStream out) throws IOException {
+		this.isMerged = true;
+		this.mergedValue = kafkaMetric.value();
+		out.defaultWriteObject();
+	}
+
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java
new file mode 100644
index 0000000..c1770ff
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MaxKafkaMetricAccumulator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.kafka.common.metrics.KafkaMetric;
+
+
+public class MaxKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator {
+	public MaxKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
+		super(kafkaMetric);
+	}
+
+	@Override
+	public void merge(Accumulator<Void, Double> other) {
+		if(!(other instanceof MaxKafkaMetricAccumulator)) {
+			throw new RuntimeException("Trying to merge incompatible accumulators");
+		}
+		MaxKafkaMetricAccumulator otherMetric = (MaxKafkaMetricAccumulator) other;
+		if(this.isMerged) {
+			if(otherMetric.isMerged) {
+				this.mergedValue = Math.max(this.mergedValue, otherMetric.mergedValue);
+			} else {
+				this.mergedValue = Math.max(this.mergedValue, otherMetric.getLocalValue());
+			}
+		} else {
+			this.isMerged = true;
+			if(otherMetric.isMerged) {
+				this.mergedValue = Math.max(this.getLocalValue(), otherMetric.mergedValue);
+			} else {
+				this.mergedValue = Math.max(this.getLocalValue(), otherMetric.getLocalValue());
+			}
+		}
+	}
+
+	@Override
+	public Accumulator<Void, Double> clone() {
+		MaxKafkaMetricAccumulator clone = new MaxKafkaMetricAccumulator(this.kafkaMetric);
+		clone.isMerged = this.isMerged;
+		clone.mergedValue = this.mergedValue;
+		return clone;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java
new file mode 100644
index 0000000..4794893
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/metrics/MinKafkaMetricAccumulator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.internals.metrics;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.kafka.common.metrics.KafkaMetric;
+
+public class MinKafkaMetricAccumulator extends DefaultKafkaMetricAccumulator {
+
+	public MinKafkaMetricAccumulator(KafkaMetric kafkaMetric) {
+		super(kafkaMetric);
+	}
+
+	@Override
+	public void merge(Accumulator<Void, Double> other) {
+		if(!(other instanceof MinKafkaMetricAccumulator)) {
+			throw new RuntimeException("Trying to merge incompatible accumulators");
+		}
+		MinKafkaMetricAccumulator otherMetric = (MinKafkaMetricAccumulator) other;
+		if(this.isMerged) {
+			if(otherMetric.isMerged) {
+				this.mergedValue = Math.min(this.mergedValue, otherMetric.mergedValue);
+			} else {
+				this.mergedValue = Math.min(this.mergedValue, otherMetric.getLocalValue());
+			}
+		} else {
+			this.isMerged = true;
+			if(otherMetric.isMerged) {
+				this.mergedValue = Math.min(this.getLocalValue(), otherMetric.mergedValue);
+			} else {
+				this.mergedValue = Math.min(this.getLocalValue(), otherMetric.getLocalValue());
+			}
+		}
+	}
+
+	@Override
+	public Accumulator<Void, Double> clone() {
+		MinKafkaMetricAccumulator clone = new MinKafkaMetricAccumulator(this.kafkaMetric);
+		clone.isMerged = this.isMerged;
+		clone.mergedValue = this.mergedValue;
+		return clone;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
new file mode 100644
index 0000000..d9dcfc1
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FixedPartitioner.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+import java.io.Serializable;
+
+/**
+ * A partitioner ensuring that each internal Flink partition ends up in one Kafka partition.
+ *
+ * Note, one Kafka partition can contain multiple Flink partitions.
+ *
+ * Cases:
+ * 	# More Flink partitions than kafka partitions
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	----------------&gt;	1
+ * 			2   --------------/
+ * 			3   -------------/
+ * 			4	------------/
+ * </pre>
+ * Some (or all) kafka partitions contain the output of more than one flink partition
+ *
+ *# Fewer Flink partitions than Kafka
+ * <pre>
+ * 		Flink Sinks:		Kafka Partitions
+ * 			1	----------------&gt;	1
+ * 			2	----------------&gt;	2
+ * 										3
+ * 										4
+ * 										5
+ * </pre>
+ *
+ *  Not all Kafka partitions contain data
+ *  To avoid such an unbalanced partitioning, use a round-robin kafka partitioner. (note that this will
+ *  cause a lot of network connections between all the Flink instances and all the Kafka brokers
+ *
+ *
+ */
+public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
+	private static final long serialVersionUID = 1627268846962918126L;
+
+	int targetPartition = -1;
+
+	@Override
+	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+		int p = 0;
+		for (int i = 0; i < parallelInstances; i++) {
+			if (i == parallelInstanceId) {
+				targetPartition = partitions[p];
+				return;
+			}
+			if (++p == partitions.length) {
+				p = 0;
+			}
+		}
+	}
+
+	@Override
+	public int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions) {
+		if (targetPartition == -1) {
+			throw new RuntimeException("The partitioner has not been initialized properly");
+		}
+		return targetPartition;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
new file mode 100644
index 0000000..038f414
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/KafkaPartitioner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.connectors.kafka.partitioner;
+
+
+import java.io.Serializable;
+
+/**
+ * It contains a open() method which is called on each parallel instance.
+ * Partitioners must be serializable!
+ */
+public abstract class KafkaPartitioner<T> implements Serializable {
+
+	private static final long serialVersionUID = -1974260817778593473L;
+
+	/**
+	 * Initializer for the Partitioner.
+	 * @param parallelInstanceId 0-indexed id of the parallel instance in Flink
+	 * @param parallelInstances the total number of parallel instances
+	 * @param partitions an array describing the partition IDs of the available Kafka partitions.
+	 */
+	public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
+		// overwrite this method if needed.
+	}
+
+	public abstract int partition(T next, byte[] serializedKey, byte[] serializedValue, int numPartitions);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
new file mode 100644
index 0000000..01e72ca
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the byte key / value messages delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
+ * processed by Flink.
+ * 
+ * @param <T> The type created by the keyed deserialization schema.
+ */
+public interface KeyedDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
+
+	/**
+	 * Deserializes the byte message.
+	 *
+	 * @param messageKey the key as a byte array (null if no key has been set)
+	 * @param message The message, as a byte array. (null if the message was empty or deleted)
+	 * @param partition The partition the message has originated from
+	 * @param offset the offset of the message in the original source (for example the Kafka offset)  @return The deserialized message as an object.
+	 */
+	T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException;
+
+	/**
+	 * Method to decide whether the element signals the end of the stream. If
+	 * true is returned the element won't be emitted.
+	 * 
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return True, if the element signals end of stream, false otherwise.
+	 */
+	boolean isEndOfStream(T nextElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
new file mode 100644
index 0000000..4b9dba2
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+/**
+ * A simple wrapper for using the DeserializationSchema with the KeyedDeserializationSchema
+ * interface
+ * @param <T> The type created by the deserialization schema.
+ */
+public class KeyedDeserializationSchemaWrapper<T> implements KeyedDeserializationSchema<T> {
+
+	private static final long serialVersionUID = 2651665280744549932L;
+
+	private final DeserializationSchema<T> deserializationSchema;
+
+	public KeyedDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
+		this.deserializationSchema = deserializationSchema;
+	}
+	@Override
+	public T deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+		return deserializationSchema.deserialize(message);
+	}
+
+	@Override
+	public boolean isEndOfStream(T nextElement) {
+		return deserializationSchema.isEndOfStream(nextElement);
+	}
+
+	@Override
+	public TypeInformation<T> getProducedType() {
+		return deserializationSchema.getProducedType();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
new file mode 100644
index 0000000..be3e87e
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema describes how to turn a data object into a different serialized
+ * representation. Most data sinks (for example Apache Kafka) require the data to be handed
+ * to them in a specific format (for example as byte strings).
+ * 
+ * @param <T> The type to be serialized.
+ */
+public interface KeyedSerializationSchema<T> extends Serializable {
+
+	/**
+	 * Serializes the key of the incoming element to a byte array
+	 * This method might return null if no key is available.
+	 *
+	 * @param element The incoming element to be serialized
+	 * @return the key of the element as a byte array
+	 */
+	byte[] serializeKey(T element);
+
+
+	/**
+	 * Serializes the value of the incoming element to a byte array
+	 * 
+	 * @param element The incoming element to be serialized
+	 * @return the value of the element as a byte array
+	 */
+	byte[] serializeValue(T element);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
new file mode 100644
index 0000000..a1a8fc0
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util.serialization;
+
+/**
+ * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema
+ * interface
+ * @param <T> The type to serialize
+ */
+public class KeyedSerializationSchemaWrapper<T> implements KeyedSerializationSchema<T> {
+
+	private static final long serialVersionUID = 1351665280744549933L;
+
+	private final SerializationSchema<T> serializationSchema;
+
+	public KeyedSerializationSchemaWrapper(SerializationSchema<T> serializationSchema) {
+		this.serializationSchema = serializationSchema;
+	}
+
+	@Override
+	public byte[] serializeKey(T element) {
+		return null;
+	}
+
+	@Override
+	public byte[] serializeValue(T element) {
+		return serializationSchema.serialize(element);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
new file mode 100644
index 0000000..a35c01e
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationKeyValueSerializationSchema.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util.serialization;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.api.java.typeutils.runtime.ByteArrayInputView;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+
+import java.io.IOException;
+
+/**
+ * A serialization and deserialization schema for Key Value Pairs that uses Flink's serialization stack to
+ * transform typed from and to byte arrays.
+ * 
+ * @param <K> The key type to be serialized.
+ * @param <V> The value type to be serialized.
+ */
+public class TypeInformationKeyValueSerializationSchema<K, V> implements KeyedDeserializationSchema<Tuple2<K, V>>, KeyedSerializationSchema<Tuple2<K,V>> {
+
+	private static final long serialVersionUID = -5359448468131559102L;
+
+	/** The serializer for the key */
+	private final TypeSerializer<K> keySerializer;
+
+	/** The serializer for the value */
+	private final TypeSerializer<V> valueSerializer;
+
+	/** reusable output serialization buffers */
+	private transient DataOutputSerializer keyOutputSerializer;
+	private transient DataOutputSerializer valueOutputSerializer;
+
+	/** The type information, to be returned by {@link #getProducedType()}. It is
+	 * transient, because it is not serializable. Note that this means that the type information
+	 * is not available at runtime, but only prior to the first serialization / deserialization */
+	private final transient TypeInformation<Tuple2<K, V>> typeInfo;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new de-/serialization schema for the given types.
+	 *
+	 * @param keyTypeInfo The type information for the key type de-/serialized by this schema.
+	 * @param valueTypeInfo The type information for the value type de-/serialized by this schema.
+	 * @param ec The execution config, which is used to parametrize the type serializers.
+	 */
+	public TypeInformationKeyValueSerializationSchema(TypeInformation<K> keyTypeInfo, TypeInformation<V> valueTypeInfo, ExecutionConfig ec) {
+		this.typeInfo = new TupleTypeInfo<>(keyTypeInfo, valueTypeInfo);
+		this.keySerializer = keyTypeInfo.createSerializer(ec);
+		this.valueSerializer = valueTypeInfo.createSerializer(ec);
+	}
+
+	public TypeInformationKeyValueSerializationSchema(Class<K> keyClass, Class<V> valueClass, ExecutionConfig config) {
+		//noinspection unchecked
+		this( (TypeInformation<K>) TypeExtractor.createTypeInfo(keyClass), (TypeInformation<V>) TypeExtractor.createTypeInfo(valueClass), config);
+	}
+
+	// ------------------------------------------------------------------------
+
+
+	@Override
+	public Tuple2<K, V> deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset) throws IOException {
+		K key = null;
+		if(messageKey != null) {
+			key = keySerializer.deserialize(new ByteArrayInputView(messageKey));
+		}
+		V value = null;
+		if(message != null) {
+			value = valueSerializer.deserialize(new ByteArrayInputView(message));
+		}
+		return new Tuple2<>(key, value);
+	}
+
+	/**
+	 * This schema never considers an element to signal end-of-stream, so this method returns always false.
+	 * @param nextElement The element to test for the end-of-stream signal.
+	 * @return Returns false.
+	 */
+	@Override
+	public boolean isEndOfStream(Tuple2<K,V> nextElement) {
+		return false;
+	}
+
+
+	@Override
+	public byte[] serializeKey(Tuple2<K, V> element) {
+		if(element.f0 == null) {
+			return null;
+		} else {
+			// key is not null. serialize it:
+			if (keyOutputSerializer == null) {
+				keyOutputSerializer = new DataOutputSerializer(16);
+			}
+			try {
+				keySerializer.serialize(element.f0, keyOutputSerializer);
+			}
+			catch (IOException e) {
+				throw new RuntimeException("Unable to serialize record", e);
+			}
+			// check if key byte array size changed
+			byte[] res = keyOutputSerializer.getByteArray();
+			if (res.length != keyOutputSerializer.length()) {
+				byte[] n = new byte[keyOutputSerializer.length()];
+				System.arraycopy(res, 0, n, 0, keyOutputSerializer.length());
+				res = n;
+			}
+			keyOutputSerializer.clear();
+			return res;
+		}
+	}
+
+	@Override
+	public byte[] serializeValue(Tuple2<K, V> element) {
+		// if the value is null, its serialized value is null as well.
+		if(element.f1 == null) {
+			return null;
+		}
+
+		if (valueOutputSerializer == null) {
+			valueOutputSerializer = new DataOutputSerializer(16);
+		}
+
+		try {
+			valueSerializer.serialize(element.f1, valueOutputSerializer);
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Unable to serialize record", e);
+		}
+
+		byte[] res = valueOutputSerializer.getByteArray();
+		if (res.length != valueOutputSerializer.length()) {
+			byte[] n = new byte[valueOutputSerializer.length()];
+			System.arraycopy(res, 0, n, 0, valueOutputSerializer.length());
+			res = n;
+		}
+		valueOutputSerializer.clear();
+		return res;
+	}
+
+
+	@Override
+	public TypeInformation<Tuple2<K,V>> getProducedType() {
+		if (typeInfo != null) {
+			return typeInfo;
+		}
+		else {
+			throw new IllegalStateException(
+					"The type information is not available after this class has been serialized and distributed.");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/81320c1c/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
new file mode 100644
index 0000000..e86d51a
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerPartitionAssignmentTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
+import org.apache.kafka.common.Node;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+
+/**
+ * Tests that the partition assignment is deterministic and stable.
+ */
+public class KafkaConsumerPartitionAssignmentTest {
+
+	private final Node fake = new Node(1337, "localhost", 1337);
+
+	@Test
+	public void testPartitionsEqualConsumers() {
+		try {
+			List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake));
+
+			for (int i = 0; i < inPartitions.size(); i++) {
+				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(
+						inPartitions, inPartitions.size(), i);
+
+				assertNotNull(parts);
+				assertEquals(1, parts.size());
+				assertTrue(contains(inPartitions, parts.get(0).getTopicPartition().getPartition()));
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private boolean contains(List<KafkaTopicPartitionLeader> inPartitions, int partition) {
+		for (KafkaTopicPartitionLeader ktp: inPartitions) {
+			if (ktp.getTopicPartition().getPartition() == partition) {
+				return true;
+			}
+		}
+		return false;
+	}
+
+	@Test
+	public void testMultiplePartitionsPerConsumers() {
+		try {
+			final int[] partitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+
+			final List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
+			final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
+
+			for (int p : partitionIDs) {
+				KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
+				partitions.add(part);
+				allPartitions.add(part);
+			}
+
+			final int numConsumers = 3;
+			final int minPartitionsPerConsumer = partitions.size() / numConsumers;
+			final int maxPartitionsPerConsumer = partitions.size() / numConsumers + 1;
+
+			for (int i = 0; i < numConsumers; i++) {
+				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(partitions, numConsumers, i);
+
+				assertNotNull(parts);
+				assertTrue(parts.size() >= minPartitionsPerConsumer);
+				assertTrue(parts.size() <= maxPartitionsPerConsumer);
+
+				for (KafkaTopicPartitionLeader p : parts) {
+					// check that the element was actually contained
+					assertTrue(allPartitions.remove(p));
+				}
+			}
+
+			// all partitions must have been assigned
+			assertTrue(allPartitions.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testPartitionsFewerThanConsumers() {
+		try {
+			List<KafkaTopicPartitionLeader> inPartitions = new ArrayList<>();
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 4), fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 52), fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 17), fake));
+			inPartitions.add(new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", 1), fake));
+
+			final Set<KafkaTopicPartitionLeader> allPartitions = new HashSet<>();
+			allPartitions.addAll(inPartitions);
+
+			final int numConsumers = 2 * inPartitions.size() + 3;
+
+			for (int i = 0; i < numConsumers; i++) {
+				List<KafkaTopicPartitionLeader> parts = FlinkKafkaConsumerBase.assignPartitions(inPartitions, numConsumers, i);
+
+				assertNotNull(parts);
+				assertTrue(parts.size() <= 1);
+
+				for (KafkaTopicPartitionLeader p : parts) {
+					// check that the element was actually contained
+					assertTrue(allPartitions.remove(p));
+				}
+			}
+
+			// all partitions must have been assigned
+			assertTrue(allPartitions.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testAssignEmptyPartitions() {
+		try {
+			List<KafkaTopicPartitionLeader> ep = new ArrayList<>();
+			List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumerBase.assignPartitions(ep, 4, 2);
+			assertNotNull(parts1);
+			assertTrue(parts1.isEmpty());
+
+			List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumerBase.assignPartitions(ep, 1, 0);
+			assertNotNull(parts2);
+			assertTrue(parts2.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testGrowingPartitionsRemainsStable() {
+		try {
+			final int[] newPartitionIDs = {4, 52, 17, 1, 2, 3, 89, 42, 31, 127, 14};
+			List<KafkaTopicPartitionLeader> newPartitions = new ArrayList<>();
+
+			for (int p : newPartitionIDs) {
+				KafkaTopicPartitionLeader part = new KafkaTopicPartitionLeader(new KafkaTopicPartition("test-topic", p), fake);
+				newPartitions.add(part);
+			}
+
+			List<KafkaTopicPartitionLeader> initialPartitions = newPartitions.subList(0, 7);
+
+			final Set<KafkaTopicPartitionLeader> allNewPartitions = new HashSet<>(newPartitions);
+			final Set<KafkaTopicPartitionLeader> allInitialPartitions = new HashSet<>(initialPartitions);
+
+			final int numConsumers = 3;
+			final int minInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers;
+			final int maxInitialPartitionsPerConsumer = initialPartitions.size() / numConsumers + 1;
+			final int minNewPartitionsPerConsumer = newPartitions.size() / numConsumers;
+			final int maxNewPartitionsPerConsumer = newPartitions.size() / numConsumers + 1;
+
+			List<KafkaTopicPartitionLeader> parts1 = FlinkKafkaConsumerBase.assignPartitions(
+					initialPartitions, numConsumers, 0);
+			List<KafkaTopicPartitionLeader> parts2 = FlinkKafkaConsumerBase.assignPartitions(
+					initialPartitions, numConsumers, 1);
+			List<KafkaTopicPartitionLeader> parts3 = FlinkKafkaConsumerBase.assignPartitions(
+					initialPartitions, numConsumers, 2);
+
+			assertNotNull(parts1);
+			assertNotNull(parts2);
+			assertNotNull(parts3);
+
+			assertTrue(parts1.size() >= minInitialPartitionsPerConsumer);
+			assertTrue(parts1.size() <= maxInitialPartitionsPerConsumer);
+			assertTrue(parts2.size() >= minInitialPartitionsPerConsumer);
+			assertTrue(parts2.size() <= maxInitialPartitionsPerConsumer);
+			assertTrue(parts3.size() >= minInitialPartitionsPerConsumer);
+			assertTrue(parts3.size() <= maxInitialPartitionsPerConsumer);
+
+			for (KafkaTopicPartitionLeader p : parts1) {
+				// check that the element was actually contained
+				assertTrue(allInitialPartitions.remove(p));
+			}
+			for (KafkaTopicPartitionLeader p : parts2) {
+				// check that the element was actually contained
+				assertTrue(allInitialPartitions.remove(p));
+			}
+			for (KafkaTopicPartitionLeader p : parts3) {
+				// check that the element was actually contained
+				assertTrue(allInitialPartitions.remove(p));
+			}
+
+			// all partitions must have been assigned
+			assertTrue(allInitialPartitions.isEmpty());
+
+			// grow the set of partitions and distribute anew
+
+			List<KafkaTopicPartitionLeader> parts1new = FlinkKafkaConsumerBase.assignPartitions(
+					newPartitions, numConsumers, 0);
+			List<KafkaTopicPartitionLeader> parts2new = FlinkKafkaConsumerBase.assignPartitions(
+					newPartitions, numConsumers, 1);
+			List<KafkaTopicPartitionLeader> parts3new = FlinkKafkaConsumerBase.assignPartitions(
+					newPartitions, numConsumers, 2);
+
+			// new partitions must include all old partitions
+
+			assertTrue(parts1new.size() > parts1.size());
+			assertTrue(parts2new.size() > parts2.size());
+			assertTrue(parts3new.size() > parts3.size());
+
+			assertTrue(parts1new.containsAll(parts1));
+			assertTrue(parts2new.containsAll(parts2));
+			assertTrue(parts3new.containsAll(parts3));
+
+			assertTrue(parts1new.size() >= minNewPartitionsPerConsumer);
+			assertTrue(parts1new.size() <= maxNewPartitionsPerConsumer);
+			assertTrue(parts2new.size() >= minNewPartitionsPerConsumer);
+			assertTrue(parts2new.size() <= maxNewPartitionsPerConsumer);
+			assertTrue(parts3new.size() >= minNewPartitionsPerConsumer);
+			assertTrue(parts3new.size() <= maxNewPartitionsPerConsumer);
+
+			for (KafkaTopicPartitionLeader p : parts1new) {
+				// check that the element was actually contained
+				assertTrue(allNewPartitions.remove(p));
+			}
+			for (KafkaTopicPartitionLeader p : parts2new) {
+				// check that the element was actually contained
+				assertTrue(allNewPartitions.remove(p));
+			}
+			for (KafkaTopicPartitionLeader p : parts3new) {
+				// check that the element was actually contained
+				assertTrue(allNewPartitions.remove(p));
+			}
+
+			// all partitions must have been assigned
+			assertTrue(allNewPartitions.isEmpty());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+}


Mime
View raw message