flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [20/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'
Date Thu, 27 Aug 2015 11:25:37 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java
new file mode 100644
index 0000000..90a1cfa
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/RequestFutureListener.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Listener interface to hook into RequestFuture completion.
+ */
+public interface RequestFutureListener<T> {
+
+    void onSuccess(T value);
+
+    void onFailure(RuntimeException e);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java
new file mode 100644
index 0000000..d94486e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SendFailedException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+import org.apache.flink.kafka_backport.common.errors.RetriableException;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Exception used in {@link ConsumerNetworkClient} to indicate the failure
+ * to transmit a request to the networking layer. This could be either because
+ * the client is still connecting to the given host or its send buffer is full.
+ */
+public class SendFailedException extends RetriableException {
+    public static final SendFailedException INSTANCE = new SendFailedException();
+
+    private static final long serialVersionUID = 1L;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java
new file mode 100644
index 0000000..adff6e0
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/StaleMetadataException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+import org.apache.flink.kafka_backport.common.errors.InvalidMetadataException;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Thrown when metadata is old and needs to be refreshed.
+ */
+public class StaleMetadataException extends InvalidMetadataException {
+    private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java
new file mode 100644
index 0000000..f5e8802
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/internals/SubscriptionState.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer.internals;
+
+import org.apache.flink.kafka_backport.clients.consumer.OffsetResetStrategy;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A class for tracking the topics, partitions, and offsets for the consumer
+ */
+public class SubscriptionState {
+
+    /* the list of topics the user has requested */
+    private final Set<String> subscribedTopics;
+
+    /* the list of partitions the user has requested */
+    private final Set<TopicPartition> subscribedPartitions;
+
+    /* the list of partitions currently assigned */
+    private final Set<TopicPartition> assignedPartitions;
+
+    /* the offset exposed to the user */
+    private final Map<TopicPartition, Long> consumed;
+
+    /* the current point we have fetched up to */
+    private final Map<TopicPartition, Long> fetched;
+
+    /* the last committed offset for each partition */
+    private final Map<TopicPartition, Long> committed;
+
+    /* do we need to request a partition assignment from the coordinator? */
+    private boolean needsPartitionAssignment;
+
+    /* do we need to request the latest committed offsets from the coordinator? */
+    private boolean needsFetchCommittedOffsets;
+
+    /* Partitions that need to be reset before fetching */
+    private Map<TopicPartition, OffsetResetStrategy> resetPartitions;
+
+    /* Default offset reset strategy */
+    private OffsetResetStrategy offsetResetStrategy;
+
+    public SubscriptionState(OffsetResetStrategy offsetResetStrategy) {
+        this.offsetResetStrategy = offsetResetStrategy;
+        this.subscribedTopics = new HashSet<String>();
+        this.subscribedPartitions = new HashSet<TopicPartition>();
+        this.assignedPartitions = new HashSet<TopicPartition>();
+        this.consumed = new HashMap<TopicPartition, Long>();
+        this.fetched = new HashMap<TopicPartition, Long>();
+        this.committed = new HashMap<TopicPartition, Long>();
+        this.needsPartitionAssignment = false;
+        this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up
+        this.resetPartitions = new HashMap<TopicPartition, OffsetResetStrategy>();
+    }
+
+    public void subscribe(String topic) {
+        if (this.subscribedPartitions.size() > 0)
+            throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
+        if (!this.subscribedTopics.contains(topic)) {
+            this.subscribedTopics.add(topic);
+            this.needsPartitionAssignment = true;
+        }
+    }
+
+    public void unsubscribe(String topic) {
+        if (!this.subscribedTopics.contains(topic))
+            throw new IllegalStateException("Topic " + topic + " was never subscribed to.");
+        this.subscribedTopics.remove(topic);
+        this.needsPartitionAssignment = true;
+        for (TopicPartition tp: assignedPartitions())
+            if (topic.equals(tp.topic()))
+                clearPartition(tp);
+    }
+
+    public void needReassignment() {
+        this.needsPartitionAssignment = true;
+    }
+
+    public void subscribe(TopicPartition tp) {
+        if (this.subscribedTopics.size() > 0)
+            throw new IllegalStateException("Subcription to topics and partitions are mutually exclusive");
+        this.subscribedPartitions.add(tp);
+        this.assignedPartitions.add(tp);
+    }
+
+    public void unsubscribe(TopicPartition partition) {
+        if (!subscribedPartitions.contains(partition))
+            throw new IllegalStateException("Partition " + partition + " was never subscribed to.");
+        subscribedPartitions.remove(partition);
+        clearPartition(partition);
+    }
+    
+    private void clearPartition(TopicPartition tp) {
+        this.assignedPartitions.remove(tp);
+        this.committed.remove(tp);
+        this.fetched.remove(tp);
+        this.consumed.remove(tp);
+        this.resetPartitions.remove(tp);
+    }
+
+    public void clearAssignment() {
+        this.assignedPartitions.clear();
+        this.committed.clear();
+        this.fetched.clear();
+        this.consumed.clear();
+        this.needsPartitionAssignment = !subscribedTopics().isEmpty();
+    }
+
+    public Set<String> subscribedTopics() {
+        return this.subscribedTopics;
+    }
+
+    public Long fetched(TopicPartition tp) {
+        return this.fetched.get(tp);
+    }
+
+    public void fetched(TopicPartition tp, long offset) {
+        if (!this.assignedPartitions.contains(tp))
+            throw new IllegalArgumentException("Can't change the fetch position for a partition you are not currently subscribed to.");
+        this.fetched.put(tp, offset);
+    }
+
+    public void committed(TopicPartition tp, long offset) {
+        this.committed.put(tp, offset);
+    }
+
+    public Long committed(TopicPartition tp) {
+        return this.committed.get(tp);
+    }
+
+    public void needRefreshCommits() {
+        this.needsFetchCommittedOffsets = true;
+    }
+
+    public boolean refreshCommitsNeeded() {
+        return this.needsFetchCommittedOffsets;
+    }
+
+    public void commitsRefreshed() {
+        this.needsFetchCommittedOffsets = false;
+    }
+    
+    public void seek(TopicPartition tp, long offset) {
+        fetched(tp, offset);
+        consumed(tp, offset);
+        resetPartitions.remove(tp);
+    }
+
+    public Set<TopicPartition> assignedPartitions() {
+        return this.assignedPartitions;
+    }
+
+    public boolean partitionsAutoAssigned() {
+        return !this.subscribedTopics.isEmpty();
+    }
+
+    public void consumed(TopicPartition tp, long offset) {
+        if (!this.assignedPartitions.contains(tp))
+            throw new IllegalArgumentException("Can't change the consumed position for a partition you are not currently subscribed to.");
+        this.consumed.put(tp, offset);
+    }
+
+    public Long consumed(TopicPartition partition) {
+        return this.consumed.get(partition);
+    }
+
+    public Map<TopicPartition, Long> allConsumed() {
+        return this.consumed;
+    }
+
+    public void needOffsetReset(TopicPartition partition, OffsetResetStrategy offsetResetStrategy) {
+        this.resetPartitions.put(partition, offsetResetStrategy);
+        this.fetched.remove(partition);
+        this.consumed.remove(partition);
+    }
+
+    public void needOffsetReset(TopicPartition partition) {
+        needOffsetReset(partition, offsetResetStrategy);
+    }
+
+    public boolean isOffsetResetNeeded(TopicPartition partition) {
+        return resetPartitions.containsKey(partition);
+    }
+
+    public boolean isOffsetResetNeeded() {
+        return !resetPartitions.isEmpty();
+    }
+
+    public OffsetResetStrategy resetStrategy(TopicPartition partition) {
+        return resetPartitions.get(partition);
+    }
+
+    public boolean hasAllFetchPositions() {
+        return this.fetched.size() >= this.assignedPartitions.size();
+    }
+
+    public Set<TopicPartition> missingFetchPositions() {
+        Set<TopicPartition> copy = new HashSet<TopicPartition>(this.assignedPartitions);
+        copy.removeAll(this.fetched.keySet());
+        return copy;
+    }
+
+    public boolean partitionAssignmentNeeded() {
+        return this.needsPartitionAssignment;
+    }
+
+    public void changePartitionAssignment(List<TopicPartition> assignments) {
+        for (TopicPartition tp : assignments)
+            if (!this.subscribedTopics.contains(tp.topic()))
+                throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
+        this.clearAssignment();
+        this.assignedPartitions.addAll(assignments);
+        this.needsPartitionAssignment = false;
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java
new file mode 100644
index 0000000..f5e12d3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Cluster.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common;
+
+import org.apache.flink.kafka_backport.common.utils.Utils;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A representation of a subset of the nodes, topics, and partitions in the Kafka cluster.
+ */
+public final class Cluster {
+
+    private final List<Node> nodes;
+    private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
+    private final Map<String, List<PartitionInfo>> partitionsByTopic;
+    private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
+    private final Map<Integer, List<PartitionInfo>> partitionsByNode;
+    private final Map<Integer, Node> nodesById;
+
+    /**
+     * Create a new cluster with the given nodes and partitions
+     * @param nodes The nodes in the cluster
+     * @param partitions Information about a subset of the topic-partitions this cluster hosts
+     */
+    public Cluster(Collection<Node> nodes, Collection<PartitionInfo> partitions) {
+        // make a randomized, unmodifiable copy of the nodes
+        List<Node> copy = new ArrayList<Node>(nodes);
+        Collections.shuffle(copy);
+        this.nodes = Collections.unmodifiableList(copy);
+        
+        this.nodesById = new HashMap<Integer, Node>();
+        for (Node node: nodes)
+            this.nodesById.put(node.id(), node);
+
+        // index the partitions by topic/partition for quick lookup
+        this.partitionsByTopicPartition = new HashMap<TopicPartition, PartitionInfo>(partitions.size());
+        for (PartitionInfo p : partitions)
+            this.partitionsByTopicPartition.put(new TopicPartition(p.topic(), p.partition()), p);
+
+        // index the partitions by topic and node respectively, and make the lists
+        // unmodifiable so we can hand them out in user-facing apis without risk
+        // of the client modifying the contents
+        HashMap<String, List<PartitionInfo>> partsForTopic = new HashMap<String, List<PartitionInfo>>();
+        HashMap<Integer, List<PartitionInfo>> partsForNode = new HashMap<Integer, List<PartitionInfo>>();
+        for (Node n : this.nodes) {
+            partsForNode.put(n.id(), new ArrayList<PartitionInfo>());
+        }
+        for (PartitionInfo p : partitions) {
+            if (!partsForTopic.containsKey(p.topic()))
+                partsForTopic.put(p.topic(), new ArrayList<PartitionInfo>());
+            List<PartitionInfo> psTopic = partsForTopic.get(p.topic());
+            psTopic.add(p);
+
+            if (p.leader() != null) {
+                List<PartitionInfo> psNode = Utils.notNull(partsForNode.get(p.leader().id()));
+                psNode.add(p);
+            }
+        }
+        this.partitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
+        this.availablePartitionsByTopic = new HashMap<String, List<PartitionInfo>>(partsForTopic.size());
+        for (Map.Entry<String, List<PartitionInfo>> entry : partsForTopic.entrySet()) {
+            String topic = entry.getKey();
+            List<PartitionInfo> partitionList = entry.getValue();
+            this.partitionsByTopic.put(topic, Collections.unmodifiableList(partitionList));
+            List<PartitionInfo> availablePartitions = new ArrayList<PartitionInfo>();
+            for (PartitionInfo part : partitionList) {
+                if (part.leader() != null)
+                    availablePartitions.add(part);
+            }
+            this.availablePartitionsByTopic.put(topic, Collections.unmodifiableList(availablePartitions));
+        }
+        this.partitionsByNode = new HashMap<Integer, List<PartitionInfo>>(partsForNode.size());
+        for (Map.Entry<Integer, List<PartitionInfo>> entry : partsForNode.entrySet())
+            this.partitionsByNode.put(entry.getKey(), Collections.unmodifiableList(entry.getValue()));
+
+    }
+
+    /**
+     * Create an empty cluster instance with no nodes and no topic-partitions.
+     */
+    public static Cluster empty() {
+        return new Cluster(new ArrayList<Node>(0), new ArrayList<PartitionInfo>(0));
+    }
+
+    /**
+     * Create a "bootstrap" cluster using the given list of host/ports
+     * @param addresses The addresses
+     * @return A cluster for these hosts/ports
+     */
+    public static Cluster bootstrap(List<InetSocketAddress> addresses) {
+        List<Node> nodes = new ArrayList<Node>();
+        int nodeId = -1;
+        for (InetSocketAddress address : addresses)
+            nodes.add(new Node(nodeId--, address.getHostName(), address.getPort()));
+        return new Cluster(nodes, new ArrayList<PartitionInfo>(0));
+    }
+
+    /**
+     * @return The known set of nodes
+     */
+    public List<Node> nodes() {
+        return this.nodes;
+    }
+    
+    /**
+     * Get the node by the node id (or null if no such node exists)
+     * @param id The id of the node
+     * @return The node, or null if no such node exists
+     */
+    public Node nodeById(int id) {
+        return this.nodesById.get(id);
+    }
+
+    /**
+     * Get the current leader for the given topic-partition
+     * @param topicPartition The topic and partition we want to know the leader for
+     * @return The node that is the leader for this topic-partition, or null if there is currently no leader
+     */
+    public Node leaderFor(TopicPartition topicPartition) {
+        PartitionInfo info = partitionsByTopicPartition.get(topicPartition);
+        if (info == null)
+            return null;
+        else
+            return info.leader();
+    }
+
+    /**
+     * Get the metadata for the specified partition
+     * @param topicPartition The topic and partition to fetch info for
+     * @return The metadata about the given topic and partition
+     */
+    public PartitionInfo partition(TopicPartition topicPartition) {
+        return partitionsByTopicPartition.get(topicPartition);
+    }
+
+    /**
+     * Get the list of partitions for this topic
+     * @param topic The topic name
+     * @return A list of partitions
+     */
+    public List<PartitionInfo> partitionsForTopic(String topic) {
+        return this.partitionsByTopic.get(topic);
+    }
+
+    /**
+     * Get the list of available partitions for this topic
+     * @param topic The topic name
+     * @return A list of partitions
+     */
+    public List<PartitionInfo> availablePartitionsForTopic(String topic) {
+        return this.availablePartitionsByTopic.get(topic);
+    }
+
+    /**
+     * Get the list of partitions whose leader is this node
+     * @param nodeId The node id
+     * @return A list of partitions
+     */
+    public List<PartitionInfo> partitionsForNode(int nodeId) {
+        return this.partitionsByNode.get(nodeId);
+    }
+
+    /**
+     * Get all topics.
+     * @return a set of all topics
+     */
+    public Set<String> topics() {
+        return this.partitionsByTopic.keySet();
+    }
+
+    @Override
+    public String toString() {
+        return "Cluster(nodes = " + this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java
new file mode 100644
index 0000000..fef2136
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Configurable.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common;
+
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A Mix-in style interface for classes that are instantiated by reflection and need to take configuration parameters
+ */
+public interface Configurable {
+
+    /**
+     * Configure this class with the given key-value pairs
+     */
+    public void configure(Map<String, ?> configs);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java
new file mode 100644
index 0000000..d9df6e8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/KafkaException.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The base class of all other Kafka exceptions
+ */
+public class KafkaException extends RuntimeException {
+
+    private final static long serialVersionUID = 1L;
+
+    public KafkaException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public KafkaException(String message) {
+        super(message);
+    }
+
+    public KafkaException(Throwable cause) {
+        super(cause);
+    }
+
+    public KafkaException() {
+        super();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java
new file mode 100644
index 0000000..8858ffe
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Metric.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A numerical metric tracked for monitoring purposes
+ */
+public interface Metric {
+
+    /**
+     * A name for this metric
+     */
+    public MetricName metricName();
+
+    /**
+     * The value of the metric
+     */
+    public double value();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java
new file mode 100644
index 0000000..18dd955
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/MetricName.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common;
+
+import org.apache.flink.kafka_backport.common.utils.Utils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The <code>MetricName</code> class encapsulates a metric's name, logical group and its related attributes
+ * <p>
+ * This class captures the following parameters
+ * <pre>
+ *  <b>name</b> The name of the metric
+ *  <b>group</b> logical group name of the metrics to which this metric belongs.
+ *  <b>description</b> A human-readable description to include in the metric. This is optional.
+ *  <b>tags</b> additional key/value attributes of the metric. This is optional.
+ * </pre>
+ * group, tags parameters can be used to create unique metric names while reporting in JMX or any custom reporting.
+ * <p>
+ * Ex: standard JMX MBean can be constructed like  <b>domainName:type=group,key1=val1,key2=val2</b>
+ * <p>
+ * Usage looks something like this:
+ * <pre>{@code
+ * // set up metrics:
+ * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
+ * Sensor sensor = metrics.sensor("message-sizes");
+ *
+ * Map<String, String> metricTags = new LinkedHashMap<String, String>();
+ * metricTags.put("client-id", "producer-1");
+ * metricTags.put("topic", "topic");
+ *
+ * MetricName metricName = new MetricName("message-size-avg", "producer-metrics", "average message size", metricTags);
+ * sensor.add(metricName, new Avg());
+ *
+ * metricName = new MetricName("message-size-max", "producer-metrics", metricTags);
+ * sensor.add(metricName, new Max());
+ *
+ * metricName = new MetricName("message-size-min", "producer-metrics", "message minimum size", "client-id", "my-client", "topic", "my-topic");
+ * sensor.add(metricName, new Min());
+ *
+ * // as messages are sent we record the sizes
+ * sensor.record(messageSize);
+ * }</pre>
+ */
+public final class MetricName {
+
+    private final String name;
+    private final String group;
+    private final String description;
+    private Map<String, String> tags;
+    private int hash = 0;
+
+    /**
+     * @param name        The name of the metric
+     * @param group       logical group name of the metrics to which this metric belongs
+     * @param description A human-readable description to include in the metric
+     * @param tags        additional key/value attributes of the metric
+     */
+    public MetricName(String name, String group, String description, Map<String, String> tags) {
+        this.name = Utils.notNull(name);
+        this.group = Utils.notNull(group);
+        this.description = Utils.notNull(description);
+        this.tags = Utils.notNull(tags);
+    }
+
+    /**
+     * @param name          The name of the metric
+     * @param group         logical group name of the metrics to which this metric belongs
+     * @param description   A human-readable description to include in the metric
+     * @param keyValue      additional key/value attributes of the metric (must come in pairs)
+     */
+    public MetricName(String name, String group, String description, String... keyValue) {
+        this(name, group, description, getTags(keyValue));
+    }
+
+    private static Map<String, String> getTags(String... keyValue) {
+        if ((keyValue.length % 2) != 0)
+            throw new IllegalArgumentException("keyValue needs to be specified in paris");
+        Map<String, String> tags = new HashMap<String, String>();
+
+        for (int i = 0; i < keyValue.length / 2; i++)
+            tags.put(keyValue[i], keyValue[i + 1]);
+        return tags;
+    }
+
+    /**
+     * @param name  The name of the metric
+     * @param group logical group name of the metrics to which this metric belongs
+     * @param tags  key/value attributes of the metric
+     */
+    public MetricName(String name, String group, Map<String, String> tags) {
+        this(name, group, "", tags);
+    }
+
+    /**
+     * @param name        The name of the metric
+     * @param group       logical group name of the metrics to which this metric belongs
+     * @param description A human-readable description to include in the metric
+     */
+    public MetricName(String name, String group, String description) {
+        this(name, group, description, new HashMap<String, String>());
+    }
+
+    /**
+     * @param name  The name of the metric
+     * @param group logical group name of the metrics to which this metric belongs
+     */
+    public MetricName(String name, String group) {
+        this(name, group, "", new HashMap<String, String>());
+    }
+
+    public String name() {
+        return this.name;
+    }
+
+    public String group() {
+        return this.group;
+    }
+
+    public Map<String, String> tags() {
+        return this.tags;
+    }
+
+    public String description() {
+        return this.description;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hash != 0)
+            return hash;
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((group == null) ? 0 : group.hashCode());
+        result = prime * result + ((name == null) ? 0 : name.hashCode());
+        result = prime * result + ((tags == null) ? 0 : tags.hashCode());
+        this.hash = result;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        MetricName other = (MetricName) obj;
+        if (group == null) {
+            if (other.group != null)
+                return false;
+        } else if (!group.equals(other.group))
+            return false;
+        if (name == null) {
+            if (other.name != null)
+                return false;
+        } else if (!name.equals(other.name))
+            return false;
+        if (tags == null) {
+            if (other.tags != null)
+                return false;
+        } else if (!tags.equals(other.tags))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "MetricName [name=" + name + ", group=" + group + ", description="
+                + description + ", tags=" + tags + "]";
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java
new file mode 100644
index 0000000..dd0537e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/Node.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common;
+
+import java.io.Serializable;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Information about a Kafka node
+ */
+public class Node implements Serializable {
+
+    private final int id;
+    private final String idString;
+    private final String host;
+    private final int port;
+
+    public Node(int id, String host, int port) {
+        super();
+        this.id = id;
+        this.idString = Integer.toString(id);
+        this.host = host;
+        this.port = port;
+    }
+
+    public static Node noNode() {
+        return new Node(-1, "", -1);
+    }
+
+    /**
+     * The node id of this node
+     */
+    public int id() {
+        return id;
+    }
+
+    /**
+     * String representation of the node id.
+     * Typically the integer id is used to serialize over the wire, the string representation is used as an identifier with NetworkClient code
+     */
+    public String idString() {
+        return idString;
+    }
+
+    /**
+     * The host name for this node
+     */
+    public String host() {
+        return host;
+    }
+
+    /**
+     * The port for this node
+     */
+    public int port() {
+        return port;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((host == null) ? 0 : host.hashCode());
+        result = prime * result + id;
+        result = prime * result + port;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        Node other = (Node) obj;
+        if (host == null) {
+            if (other.host != null)
+                return false;
+        } else if (!host.equals(other.host))
+            return false;
+        if (id != other.id)
+            return false;
+        if (port != other.port)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "Node(" + id + ", " + host + ", " + port + ")";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java
new file mode 100644
index 0000000..ac7cc61
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/PartitionInfo.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Information about a topic-partition.
+ */
+public class PartitionInfo {
+
+    private final String topic;
+    private final int partition;
+    private final Node leader;
+    private final Node[] replicas;
+    private final Node[] inSyncReplicas;
+
+    public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {
+        this.topic = topic;
+        this.partition = partition;
+        this.leader = leader;
+        this.replicas = replicas;
+        this.inSyncReplicas = inSyncReplicas;
+    }
+
+    /**
+     * The topic name
+     */
+    public String topic() {
+        return topic;
+    }
+
+    /**
+     * The partition id
+     */
+    public int partition() {
+        return partition;
+    }
+
+    /**
+     * The node id of the node currently acting as a leader for this partition or -1 if there is no leader
+     */
+    public Node leader() {
+        return leader;
+    }
+
+    /**
+     * The complete set of replicas for this partition regardless of whether they are alive or up-to-date
+     */
+    public Node[] replicas() {
+        return replicas;
+    }
+
+    /**
+     * The subset of the replicas that are in sync, that is caught-up to the leader and ready to take over as leader if
+     * the leader should fail
+     */
+    public Node[] inSyncReplicas() {
+        return inSyncReplicas;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("Partition(topic = %s, partition = %d, leader = %s, replicas = %s, isr = %s",
+                topic,
+                partition,
+                leader == null ? "none" : leader.id(),
+                fmtNodeIds(replicas),
+                fmtNodeIds(inSyncReplicas));
+    }
+
+    /* Extract the node ids from each item in the array and format for display */
+    private String fmtNodeIds(Node[] nodes) {
+        StringBuilder b = new StringBuilder("[");
+        for (int i = 0; i < nodes.length - 1; i++) {
+            b.append(Integer.toString(nodes[i].id()));
+            b.append(',');
+        }
+        if (nodes.length > 0) {
+            b.append(Integer.toString(nodes[nodes.length - 1].id()));
+            b.append(',');
+        }
+        b.append("]");
+        return b.toString();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java
new file mode 100644
index 0000000..cfb4848
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/TopicPartition.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kafka_backport.common;
+
+import java.io.Serializable;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A topic name and partition number
+ */
+public final class TopicPartition implements Serializable {
+
+    private int hash = 0;
+    private final int partition;
+    private final String topic;
+
+    public TopicPartition(String topic, int partition) {
+        this.partition = partition;
+        this.topic = topic;
+    }
+
+    public int partition() {
+        return partition;
+    }
+
+    public String topic() {
+        return topic;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hash != 0)
+            return hash;
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + partition;
+        result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+        this.hash = result;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        TopicPartition other = (TopicPartition) obj;
+        if (partition != other.partition)
+            return false;
+        if (topic == null) {
+            if (other.topic != null)
+                return false;
+        } else if (!topic.equals(other.topic))
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return topic + "-" + partition;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java
new file mode 100644
index 0000000..1b5cbc9
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/AbstractConfig.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.config;
+
+import org.apache.flink.kafka_backport.common.Configurable;
+import org.apache.flink.kafka_backport.common.KafkaException;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A convenient base class for configurations to extend.
+ * <p>
+ * This class holds both the original configuration that was provided as well as the parsed
+ */
+public class AbstractConfig {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    /* configs for which values have been requested, used to detect unused configs */
+    private final Set<String> used;
+
+    /* the original values passed in by the user */
+    private final Map<String, ?> originals;
+
+    /* the parsed values */
+    private final Map<String, Object> values;
+
+    @SuppressWarnings("unchecked")
+    public AbstractConfig(ConfigDef definition, Map<?, ?> originals) {
+        /* check that all the keys are really strings */
+        for (Object key : originals.keySet())
+            if (!(key instanceof String))
+                throw new ConfigException(key.toString(), originals.get(key), "Key must be a string.");
+        this.originals = (Map<String, ?>) originals;
+        this.values = definition.parse(this.originals);
+        this.used = Collections.synchronizedSet(new HashSet<String>());
+        logAll();
+    }
+
+    protected Object get(String key) {
+        if (!values.containsKey(key))
+            throw new ConfigException(String.format("Unknown configuration '%s'", key));
+        used.add(key);
+        return values.get(key);
+    }
+
+    public Short getShort(String key) {
+        return (Short) get(key);
+    }
+
+    public Integer getInt(String key) {
+        return (Integer) get(key);
+    }
+
+    public Long getLong(String key) {
+        return (Long) get(key);
+    }
+
+    public Double getDouble(String key) {
+        return (Double) get(key);
+    }
+
+    @SuppressWarnings("unchecked")
+    public List<String> getList(String key) {
+        return (List<String>) get(key);
+    }
+
+    public boolean getBoolean(String key) {
+        return (Boolean) get(key);
+    }
+
+    public String getString(String key) {
+        return (String) get(key);
+    }
+
+    public Class<?> getClass(String key) {
+        return (Class<?>) get(key);
+    }
+
+    public Set<String> unused() {
+        Set<String> keys = new HashSet<String>(originals.keySet());
+        keys.removeAll(used);
+        return keys;
+    }
+
+    public Map<String, Object> originals() {
+        Map<String, Object> copy = new HashMap<String, Object>();
+        copy.putAll(originals);
+        return copy;
+    }
+
+    private void logAll() {
+        StringBuilder b = new StringBuilder();
+        b.append(getClass().getSimpleName());
+        b.append(" values: ");
+        b.append(Utils.NL);
+        for (Map.Entry<String, Object> entry : this.values.entrySet()) {
+            b.append('\t');
+            b.append(entry.getKey());
+            b.append(" = ");
+            b.append(entry.getValue());
+            b.append(Utils.NL);
+        }
+        log.info(b.toString());
+    }
+
+    /**
+     * Log warnings for any unused configurations
+     */
+    public void logUnused() {
+        for (String key : unused())
+            log.warn("The configuration {} = {} was supplied but isn't a known config.", key, this.values.get(key));
+    }
+
+    /**
+     * Get a configured instance of the give class specified by the given configuration key. If the object implements
+     * Configurable configure it using the configuration.
+     * 
+     * @param key The configuration key for the class
+     * @param t The interface the class should implement
+     * @return A configured instance of the class
+     */
+    public <T> T getConfiguredInstance(String key, Class<T> t) {
+        Class<?> c = getClass(key);
+        if (c == null)
+            return null;
+        Object o = Utils.newInstance(c);
+        if (!t.isInstance(o))
+            throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
+        if (o instanceof Configurable)
+            ((Configurable) o).configure(this.originals);
+        return t.cast(o);
+    }
+
+    public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
+        List<String> klasses = getList(key);
+        List<T> objects = new ArrayList<T>();
+        for (String klass : klasses) {
+            Class<?> c;
+            try {
+                c = Class.forName(klass);
+            } catch (ClassNotFoundException e) {
+                throw new ConfigException(key, klass, "Class " + klass + " could not be found.");
+            }
+            if (c == null)
+                return null;
+            Object o = Utils.newInstance(c);
+            if (!t.isInstance(o))
+                throw new KafkaException(c.getName() + " is not an instance of " + t.getName());
+            if (o instanceof Configurable)
+                ((Configurable) o).configure(this.originals);
+            objects.add(t.cast(o));
+        }
+        return objects;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java
new file mode 100644
index 0000000..1bbe891
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigDef.java
@@ -0,0 +1,456 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.config;
+
+import org.apache.flink.kafka_backport.common.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * This class is used for specifying the set of expected configurations, their type, their defaults, their
+ * documentation, and any special validation logic used for checking the correctness of the values the user provides.
+ * <p/>
+ * Usage of this class looks something like this:
+ * <p/>
+ * <pre>
+ * ConfigDef defs = new ConfigDef();
+ * defs.define(&quot;config_name&quot;, Type.STRING, &quot;default string value&quot;, &quot;This configuration is used for blah blah blah.&quot;);
+ * defs.define(&quot;another_config_name&quot;, Type.INT, 42, Range.atLeast(0), &quot;More documentation on this config&quot;);
+ *
+ * Properties props = new Properties();
+ * props.setProperty(&quot;config_name&quot;, &quot;some value&quot;);
+ * Map&lt;String, Object&gt; configs = defs.parse(props);
+ *
+ * String someConfig = (String) configs.get(&quot;config_name&quot;); // will return &quot;some value&quot;
+ * int anotherConfig = (Integer) configs.get(&quot;another_config_name&quot;); // will return default value of 42
+ * </pre>
+ * <p/>
+ * This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional
+ * functionality for accessing configs.
+ */
+public class ConfigDef {
+
+    private static final Object NO_DEFAULT_VALUE = new String("");
+
+    private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>();
+
+    /**
+     * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
+     *
+     * @return new unmodifiable {@link Set} instance containing the keys
+     */
+    public Set<String> names() {
+        return Collections.unmodifiableSet(configKeys.keySet());
+    }
+
+    /**
+     * Define a new configuration
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param defaultValue  The default value to use if this config isn't present
+     * @param validator     A validator to use in checking the correctness of the config
+     * @param importance    The importance of this config: is this something you will likely need to change.
+     * @param documentation The documentation string for the config
+     * @param required      Should the config fail if given property is not set and doesn't have default value specified
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
+                            boolean required) {
+        if (configKeys.containsKey(name))
+            throw new ConfigException("Configuration " + name + " is defined twice.");
+        Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
+        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, required));
+        return this;
+    }
+
+    /**
+     * Define a new required configuration
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param defaultValue  The default value to use if this config isn't present
+     * @param validator     A validator to use in checking the correctness of the config
+     * @param importance    The importance of this config: is this something you will likely need to change.
+     * @param documentation The documentation string for the config
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) {
+        return define(name, type, defaultValue, validator, importance, documentation, true);
+    }
+
+    /**
+     * Define a new configuration with no special validation logic
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param defaultValue  The default value to use if this config isn't present
+     * @param importance    The importance of this config: is this something you will likely need to change.
+     * @param documentation The documentation string for the config
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation) {
+        return define(name, type, defaultValue, null, importance, documentation, true);
+    }
+
+    /**
+     * Define a required parameter with no default value
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param validator     A validator to use in checking the correctness of the config
+     * @param importance    The importance of this config: is this something you will likely need to change.
+     * @param documentation The documentation string for the config
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Validator validator, Importance importance, String documentation) {
+        return define(name, type, NO_DEFAULT_VALUE, validator, importance, documentation, true);
+    }
+
+    /**
+     * Define a required parameter with no default value and no special validation logic
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param importance    The importance of this config: is this something you will likely need to change.
+     * @param documentation The documentation string for the config
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Importance importance, String documentation) {
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, true);
+    }
+
+    /**
+     * Define a required parameter with no default value and no special validation logic
+     *
+     * @param name          The name of the config parameter
+     * @param type          The type of the config
+     * @param importance    The importance of this config: is this something you will likely need to change.
+     * @param documentation The documentation string for the config
+     * @param required      Should the config fail if given property is not set and doesn't have default value specified
+     * @return This ConfigDef so you can chain calls
+     */
+    public ConfigDef define(String name, Type type, Importance importance, String documentation, boolean required) {
+        return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, required);
+    }
+
+
+    /**
+     * Parse and validate configs against this configuration definition. The input is a map of configs. It is expected
+     * that the keys of the map are strings, but the values can either be strings or they may already be of the
+     * appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a
+     * programmatically constructed map.
+     *
+     * @param props The configs to parse and validate
+     * @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into
+     * the appropriate type (int, string, etc)
+     */
+    public Map<String, Object> parse(Map<?, ?> props) {
+        /* parse all known keys */
+        Map<String, Object> values = new HashMap<String, Object>();
+        for (ConfigKey key : configKeys.values()) {
+            Object value;
+            // props map contains setting - assign ConfigKey value
+            if (props.containsKey(key.name))
+                value = parseType(key.name, props.get(key.name), key.type);
+                // props map doesn't contain setting, the key is required and no default value specified - it's an error
+            else if (key.defaultValue == NO_DEFAULT_VALUE && key.required)
+                throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value.");
+                // props map doesn't contain setting, no default value specified and the key is not required - assign it to null
+            else if (!key.hasDefault() && !key.required)
+                value = null;
+                // otherwise assign setting it's default value
+            else
+                value = key.defaultValue;
+            if (key.validator != null)
+                key.validator.ensureValid(key.name, value);
+            values.put(key.name, value);
+        }
+        return values;
+    }
+
+    /**
+     * Parse a value according to its expected type.
+     *
+     * @param name  The config name
+     * @param value The config value
+     * @param type  The expected type
+     * @return The parsed object
+     */
+    private Object parseType(String name, Object value, Type type) {
+        try {
+            String trimmed = null;
+            if (value instanceof String)
+                trimmed = ((String) value).trim();
+            switch (type) {
+                case BOOLEAN:
+                    if (value instanceof String) {
+                        if (trimmed.equalsIgnoreCase("true"))
+                            return true;
+                        else if (trimmed.equalsIgnoreCase("false"))
+                            return false;
+                        else
+                            throw new ConfigException(name, value, "Expected value to be either true or false");
+                    } else if (value instanceof Boolean)
+                        return value;
+                    else
+                        throw new ConfigException(name, value, "Expected value to be either true or false");
+                case STRING:
+                    if (value instanceof String)
+                        return trimmed;
+                    else
+                        throw new ConfigException(name, value, "Expected value to be a string, but it was a " + value.getClass().getName());
+                case INT:
+                    if (value instanceof Integer) {
+                        return (Integer) value;
+                    } else if (value instanceof String) {
+                        return Integer.parseInt(trimmed);
+                    } else {
+                        throw new ConfigException(name, value, "Expected value to be an number.");
+                    }
+                case SHORT:
+                    if (value instanceof Short) {
+                        return (Short) value;
+                    } else if (value instanceof String) {
+                        return Short.parseShort(trimmed);
+                    } else {
+                        throw new ConfigException(name, value, "Expected value to be an number.");
+                    }
+                case LONG:
+                    if (value instanceof Integer)
+                        return ((Integer) value).longValue();
+                    if (value instanceof Long)
+                        return (Long) value;
+                    else if (value instanceof String)
+                        return Long.parseLong(trimmed);
+                    else
+                        throw new ConfigException(name, value, "Expected value to be an number.");
+                case DOUBLE:
+                    if (value instanceof Number)
+                        return ((Number) value).doubleValue();
+                    else if (value instanceof String)
+                        return Double.parseDouble(trimmed);
+                    else
+                        throw new ConfigException(name, value, "Expected value to be an number.");
+                case LIST:
+                    if (value instanceof List)
+                        return (List<?>) value;
+                    else if (value instanceof String)
+                        if (trimmed.isEmpty())
+                            return Collections.emptyList();
+                        else
+                            return Arrays.asList(trimmed.split("\\s*,\\s*", -1));
+                    else
+                        throw new ConfigException(name, value, "Expected a comma separated list.");
+                case CLASS:
+                    if (value instanceof Class)
+                        return (Class<?>) value;
+                    else if (value instanceof String)
+                        return Class.forName(trimmed);
+                    else
+                        throw new ConfigException(name, value, "Expected a Class instance or class name.");
+                default:
+                    throw new IllegalStateException("Unknown type.");
+            }
+        } catch (NumberFormatException e) {
+            throw new ConfigException(name, value, "Not a number of type " + type);
+        } catch (ClassNotFoundException e) {
+            throw new ConfigException(name, value, "Class " + value + " could not be found.");
+        }
+    }
+
+    /**
+     * The config types
+     */
+    public enum Type {
+        BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS;
+    }
+
+    public enum Importance {
+        HIGH, MEDIUM, LOW
+    }
+
+    /**
+     * Validation logic the user may provide
+     */
+    public interface Validator {
+        public void ensureValid(String name, Object o);
+    }
+
+    /**
+     * Validation logic for numeric ranges
+     */
+    public static class Range implements Validator {
+        private final Number min;
+        private final Number max;
+
+        private Range(Number min, Number max) {
+            this.min = min;
+            this.max = max;
+        }
+
+        /**
+         * A numeric range that checks only the lower bound
+         *
+         * @param min The minimum acceptable value
+         */
+        public static Range atLeast(Number min) {
+            return new Range(min, null);
+        }
+
+        /**
+         * A numeric range that checks both the upper and lower bound
+         */
+        public static Range between(Number min, Number max) {
+            return new Range(min, max);
+        }
+
+        public void ensureValid(String name, Object o) {
+            Number n = (Number) o;
+            if (min != null && n.doubleValue() < min.doubleValue())
+                throw new ConfigException(name, o, "Value must be at least " + min);
+            if (max != null && n.doubleValue() > max.doubleValue())
+                throw new ConfigException(name, o, "Value must be no more than " + max);
+        }
+
+        public String toString() {
+            if (min == null)
+                return "[...," + max + "]";
+            else if (max == null)
+                return "[" + min + ",...]";
+            else
+                return "[" + min + ",...," + max + "]";
+        }
+    }
+
+    public static class ValidString implements Validator {
+        List<String> validStrings;
+
+        private ValidString(List<String> validStrings) {
+            this.validStrings = validStrings;
+        }
+
+        public static ValidString in(String... validStrings) {
+            return new ValidString(Arrays.asList(validStrings));
+        }
+
+        @Override
+        public void ensureValid(String name, Object o) {
+            String s = (String) o;
+            if (!validStrings.contains(s)) {
+                throw new ConfigException(name, o, "String must be one of: " + Utils.join(validStrings, ", "));
+            }
+
+        }
+
+        public String toString() {
+            return "[" + Utils.join(validStrings, ", ") + "]";
+        }
+    }
+
+    private static class ConfigKey {
+        public final String name;
+        public final Type type;
+        public final String documentation;
+        public final Object defaultValue;
+        public final Validator validator;
+        public final Importance importance;
+        public final boolean required;
+
+        public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, boolean required) {
+            super();
+            this.name = name;
+            this.type = type;
+            this.defaultValue = defaultValue;
+            this.validator = validator;
+            this.importance = importance;
+            if (this.validator != null)
+                this.validator.ensureValid(name, defaultValue);
+            this.documentation = documentation;
+            this.required = required;
+        }
+
+        public boolean hasDefault() {
+            return this.defaultValue != NO_DEFAULT_VALUE;
+        }
+
+    }
+
+    public String toHtmlTable() {
+        // sort first required fields, then by importance, then name
+        List<ConfigKey> configs = new ArrayList<ConfigKey>(this.configKeys.values());
+        Collections.sort(configs, new Comparator<ConfigKey>() {
+            public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) {
+                // first take anything with no default value
+                if (!k1.hasDefault() && k2.hasDefault())
+                    return -1;
+                else if (!k2.hasDefault() && k1.hasDefault())
+                    return 1;
+
+                // then sort by importance
+                int cmp = k1.importance.compareTo(k2.importance);
+                if (cmp == 0)
+                    // then sort in alphabetical order
+                    return k1.name.compareTo(k2.name);
+                else
+                    return cmp;
+            }
+        });
+        StringBuilder b = new StringBuilder();
+        b.append("<table>\n");
+        b.append("<tr>\n");
+        b.append("<th>Name</th>\n");
+        b.append("<th>Type</th>\n");
+        b.append("<th>Default</th>\n");
+        b.append("<th>Importance</th>\n");
+        b.append("<th>Description</th>\n");
+        b.append("</tr>\n");
+        for (ConfigKey def : configs) {
+            b.append("<tr>\n");
+            b.append("<td>");
+            b.append(def.name);
+            b.append("</td>");
+            b.append("<td>");
+            b.append(def.type.toString().toLowerCase());
+            b.append("</td>");
+            b.append("<td>");
+            b.append(def.defaultValue == null ? "" : def.defaultValue);
+            b.append("</td>");
+            b.append("<td>");
+            b.append(def.importance.toString().toLowerCase());
+            b.append("</td>");
+            b.append("<td>");
+            b.append(def.documentation);
+            b.append("</td>");
+            b.append("</tr>\n");
+        }
+        b.append("</table>");
+        return b.toString();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java
new file mode 100644
index 0000000..13b9410
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/config/ConfigException.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.config;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Thrown if the user supplies an invalid configuration
+ */
+public class ConfigException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ConfigException(String message) {
+        super(message);
+    }
+
+    public ConfigException(String name, Object value) {
+        this(name, value, null);
+    }
+
+    public ConfigException(String name, Object value, String message) {
+        super("Invalid value " + value + " for configuration " + name + (message == null ? "" : ": " + message));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java
new file mode 100644
index 0000000..1e6f7ec
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/errors/ApiException.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.kafka_backport.common.errors;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * Any API exception that is part of the public protocol and should be a subclass of this class and be part of this
+ * package.
+ */
+public class ApiException extends KafkaException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ApiException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public ApiException(String message) {
+        super(message);
+    }
+
+    public ApiException(Throwable cause) {
+        super(cause);
+    }
+
+    public ApiException() {
+        super();
+    }
+
+    /* avoid the expensive and useless stack trace for api exceptions */
+    @Override
+    public Throwable fillInStackTrace() {
+        return this;
+    }
+
+}


Mime
View raw message