flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [23/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'
Date Thu, 27 Aug 2015 11:25:40 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/NetworkClient.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/NetworkClient.java
new file mode 100644
index 0000000..90258bb
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/NetworkClient.java
@@ -0,0 +1,528 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+import org.apache.flink.kafka_backport.common.Node;
+import org.apache.flink.kafka_backport.common.network.NetworkReceive;
+import org.apache.flink.kafka_backport.common.network.Send;
+import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
+import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
+import org.apache.flink.kafka_backport.common.protocol.types.Struct;
+import org.apache.flink.kafka_backport.common.requests.MetadataRequest;
+import org.apache.flink.kafka_backport.common.requests.MetadataResponse;
+import org.apache.flink.kafka_backport.common.requests.RequestHeader;
+import org.apache.flink.kafka_backport.common.requests.RequestSend;
+import org.apache.flink.kafka_backport.common.utils.Utils;
+import org.apache.flink.kafka_backport.common.Cluster;
+import org.apache.flink.kafka_backport.common.network.Selectable;
+import org.apache.flink.kafka_backport.common.requests.ResponseHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
+ * user-facing producer and consumer clients.
+ * <p>
+ * This class is not thread-safe!
+ */
+public class NetworkClient implements KafkaClient {
+
+    private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
+
+    /* the selector used to perform network i/o */
+    private final Selectable selector;
+
+    /* the current cluster metadata */
+    private final Metadata metadata;
+
+    /* the state of each node's connection */
+    private final ClusterConnectionStates connectionStates;
+
+    /* the set of requests currently being sent or awaiting a response */
+    private final InFlightRequests inFlightRequests;
+
+    /* the socket send buffer size in bytes */
+    private final int socketSendBuffer;
+
+    /* the socket receive size buffer in bytes */
+    private final int socketReceiveBuffer;
+
+    /* the client id used to identify this client in requests to the server */
+    private final String clientId;
+
+    /* a random offset to use when choosing nodes to avoid having all nodes choose the same node */
+    private final int nodeIndexOffset;
+
+    /* the current correlation id to use when sending requests to servers */
+    private int correlation;
+
+    /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
+    private boolean metadataFetchInProgress;
+
+    /* the last timestamp when no broker node is available to connect */
+    private long lastNoNodeAvailableMs;
+
+    public NetworkClient(Selectable selector,
+                         Metadata metadata,
+                         String clientId,
+                         int maxInFlightRequestsPerConnection,
+                         long reconnectBackoffMs,
+                         int socketSendBuffer,
+                         int socketReceiveBuffer) {
+        this.selector = selector;
+        this.metadata = metadata;
+        this.clientId = clientId;
+        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
+        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
+        this.socketSendBuffer = socketSendBuffer;
+        this.socketReceiveBuffer = socketReceiveBuffer;
+        this.correlation = 0;
+        this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
+        this.metadataFetchInProgress = false;
+        this.lastNoNodeAvailableMs = 0;
+    }
+
+    /**
+     * Begin connecting to the given node, return true if we are already connected and ready to send to that node.
+     * 
+     * @param node The node to check
+     * @param now The current timestamp
+     * @return True if we are ready to send to the given node
+     */
+    @Override
+    public boolean ready(Node node, long now) {
+        if (isReady(node, now))
+            return true;
+
+        if (connectionStates.canConnect(node.idString(), now))
+            // if we are interested in sending to a node and we don't have a connection to it, initiate one
+            initiateConnect(node, now);
+
+        return false;
+    }
+
+    /**
+     * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
+     * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
+     * connections.
+     * 
+     * @param node The node to check
+     * @param now The current timestamp
+     * @return The number of milliseconds to wait.
+     */
+    @Override
+    public long connectionDelay(Node node, long now) {
+        return connectionStates.connectionDelay(node.idString(), now);
+    }
+
+    /**
+     * Check if the connection of the node has failed, based on the connection state. Such connection failure are
+     * usually transient and can be resumed in the next {@link #ready(Node, long)} }
+     * call, but there are cases where transient failures needs to be caught and re-acted upon.
+     *
+     * @param node the node to check
+     * @return true iff the connection has failed and the node is disconnected
+     */
+    @Override
+    public boolean connectionFailed(Node node) {
+        return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED);
+    }
+
+    /**
+     * Check if the node with the given id is ready to send more requests.
+     * 
+     * @param node The node
+     * @param now The current time in ms
+     * @return true if the node is ready
+     */
+    @Override
+    public boolean isReady(Node node, long now) {
+        String nodeId = node.idString();
+        if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0)
+            // if we need to update our metadata now declare all requests unready to make metadata requests first
+            // priority
+            return false;
+        else
+            // otherwise we are ready if we are connected and can send more requests
+            return isSendable(nodeId);
+    }
+
+    /**
+     * Are we connected and ready and able to send more requests to the given connection?
+     * 
+     * @param node The node
+     */
+    private boolean isSendable(String node) {
+        return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node);
+    }
+
+    /**
+     * Return the state of the connection to the given node
+     * 
+     * @param node The node to check
+     * @return The connection state
+     */
+    public ConnectionState connectionState(String node) {
+        return connectionStates.connectionState(node);
+    }
+
+    /**
+     * Queue up the given request for sending. Requests can only be sent out to ready nodes.
+     * 
+     * @param request The request
+     */
+    @Override
+    public void send(ClientRequest request) {
+        String nodeId = request.request().destination();
+        if (!isSendable(nodeId))
+            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
+
+        this.inFlightRequests.add(request);
+        selector.send(request.request());
+    }
+
+    /**
+     * Do actual reads and writes to sockets.
+     * 
+     * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately
+     * @param now The current time in milliseconds
+     * @return The list of responses received
+     */
+    @Override
+    public List<ClientResponse> poll(long timeout, long now) {
+        // should we update our metadata?
+        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
+        long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
+        long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
+        // if there is no node available to connect, back off refreshing metadata
+        long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
+                waitForMetadataFetch);
+        if (metadataTimeout == 0)
+            maybeUpdateMetadata(now);
+        // do the I/O
+        try {
+            this.selector.poll(Math.min(timeout, metadataTimeout));
+        } catch (IOException e) {
+            log.error("Unexpected error during I/O in producer network thread", e);
+        }
+
+        // process completed actions
+        List<ClientResponse> responses = new ArrayList<ClientResponse>();
+        handleCompletedSends(responses, now);
+        handleCompletedReceives(responses, now);
+        handleDisconnections(responses, now);
+        handleConnections();
+
+        // invoke callbacks
+        for (ClientResponse response : responses) {
+            if (response.request().hasCallback()) {
+                try {
+                    response.request().callback().onComplete(response);
+                } catch (Exception e) {
+                    log.error("Uncaught error in request completion:", e);
+                }
+            }
+        }
+
+        return responses;
+    }
+
+    /**
+     * Await all the outstanding responses for requests on the given connection
+     * 
+     * @param node The node to block on
+     * @param now The current time in ms
+     * @return All the collected responses
+     */
+    @Override
+    public List<ClientResponse> completeAll(String node, long now) {
+        try {
+            this.selector.muteAll();
+            this.selector.unmute(node);
+            List<ClientResponse> responses = new ArrayList<ClientResponse>();
+            while (inFlightRequestCount(node) > 0)
+                responses.addAll(poll(Integer.MAX_VALUE, now));
+            return responses;
+        } finally {
+            this.selector.unmuteAll();
+        }
+    }
+
+    /**
+     * Wait for all outstanding requests to complete.
+     */
+    @Override
+    public List<ClientResponse> completeAll(long now) {
+        List<ClientResponse> responses = new ArrayList<ClientResponse>();
+        while (inFlightRequestCount() > 0)
+            responses.addAll(poll(Integer.MAX_VALUE, now));
+        return responses;
+    }
+
+    /**
+     * Get the number of in-flight requests
+     */
+    @Override
+    public int inFlightRequestCount() {
+        return this.inFlightRequests.inFlightRequestCount();
+    }
+
+    /**
+     * Get the number of in-flight requests for a given node
+     */
+    @Override
+    public int inFlightRequestCount(String node) {
+        return this.inFlightRequests.inFlightRequestCount(node);
+    }
+
+    /**
+     * Generate a request header for the given API key
+     * 
+     * @param key The api key
+     * @return A request header with the appropriate client id and correlation id
+     */
+    @Override
+    public RequestHeader nextRequestHeader(ApiKeys key) {
+        return new RequestHeader(key.id, clientId, correlation++);
+    }
+
+    /**
+     * Interrupt the client if it is blocked waiting on I/O.
+     */
+    @Override
+    public void wakeup() {
+        this.selector.wakeup();
+    }
+
+    /**
+     * Close the network client
+     */
+    @Override
+    public void close() {
+        this.selector.close();
+    }
+
+    /**
+     * Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
+     * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
+     * connection if all existing connections are in use. This method will never choose a node for which there is no
+     * existing connection and from which we have disconnected within the reconnect backoff period.
+     * 
+     * @return The node with the fewest in-flight requests.
+     */
+    public Node leastLoadedNode(long now) {
+        List<Node> nodes = this.metadata.fetch().nodes();
+        int inflight = Integer.MAX_VALUE;
+        Node found = null;
+        for (int i = 0; i < nodes.size(); i++) {
+            int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
+            Node node = nodes.get(idx);
+            int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
+            if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {
+                // if we find an established connection with no in-flight requests we can stop right away
+                return node;
+            } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
+                // otherwise if this is the best we have found so far, record that
+                inflight = currInflight;
+                found = node;
+            }
+        }
+        return found;
+    }
+
+    /**
+     * Handle any completed request send. In particular if no response is expected consider the request complete.
+     * 
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleCompletedSends(List<ClientResponse> responses, long now) {
+        // if no response is expected then when the send is completed, return it
+        for (Send send : this.selector.completedSends()) {
+            ClientRequest request = this.inFlightRequests.lastSent(send.destination());
+            if (!request.expectResponse()) {
+                this.inFlightRequests.completeLastSent(send.destination());
+                responses.add(new ClientResponse(request, now, false, null));
+            }
+        }
+    }
+
+    /**
+     * Handle any completed receives and update the response list with the responses received.
+     * 
+     * @param responses The list of responses to update
+     * @param now The current time
+     */
+    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
+        for (NetworkReceive receive : this.selector.completedReceives()) {
+            String source = receive.source();
+            ClientRequest req = inFlightRequests.completeNext(source);
+            ResponseHeader header = ResponseHeader.parse(receive.payload());
+            short apiKey = req.request().header().apiKey();
+            Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
+            correlate(req.request().header(), header);
+            if (apiKey == ApiKeys.METADATA.id) {
+                handleMetadataResponse(req.request().header(), body, now);
+            } else {
+                // need to add body/header to response here
+                responses.add(new ClientResponse(req, now, false, body));
+            }
+        }
+    }
+
+    private void handleMetadataResponse(RequestHeader header, Struct body, long now) {
+        this.metadataFetchInProgress = false;
+        MetadataResponse response = new MetadataResponse(body);
+        Cluster cluster = response.cluster();
+        // check if any topics metadata failed to get updated
+        if (response.errors().size() > 0) {
+            log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
+        }
+        // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
+        // created which means we will get errors and no nodes until it exists
+        if (cluster.nodes().size() > 0) {
+            this.metadata.update(cluster, now);
+        } else {
+            log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
+            this.metadata.failedUpdate(now);
+        }
+    }
+
+    /**
+     * Handle any disconnected connections
+     * 
+     * @param responses The list of responses that completed with the disconnection
+     * @param now The current time
+     */
+    private void handleDisconnections(List<ClientResponse> responses, long now) {
+        for (String node : this.selector.disconnected()) {
+            connectionStates.disconnected(node);
+            log.debug("Node {} disconnected.", node);
+            for (ClientRequest request : this.inFlightRequests.clearAll(node)) {
+                log.trace("Cancelled request {} due to node {} being disconnected", request, node);
+                ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
+                if (requestKey == ApiKeys.METADATA)
+                    metadataFetchInProgress = false;
+                else
+                    responses.add(new ClientResponse(request, now, true, null));
+            }
+        }
+        // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
+        if (this.selector.disconnected().size() > 0)
+            this.metadata.requestUpdate();
+    }
+
+    /**
+     * Record any newly completed connections
+     */
+    private void handleConnections() {
+        for (String node : this.selector.connected()) {
+            log.debug("Completed connection to node {}", node);
+            this.connectionStates.connected(node);
+        }
+    }
+
+    /**
+     * Validate that the response corresponds to the request we expect or else explode
+     */
+    private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
+        if (requestHeader.correlationId() != responseHeader.correlationId())
+            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
+                    + ") does not match request (" + requestHeader.correlationId() + ")");
+    }
+
+    /**
+     * Create a metadata request for the given topics
+     */
+    private ClientRequest metadataRequest(long now, String node, Set<String> topics) {
+        MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
+        RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
+        return new ClientRequest(now, true, send, null);
+    }
+
+    /**
+     * Add a metadata request to the list of sends if we can make one
+     */
+    private void maybeUpdateMetadata(long now) {
+        // Beware that the behavior of this method and the computation of timeouts for poll() are
+        // highly dependent on the behavior of leastLoadedNode.
+        Node node = this.leastLoadedNode(now);
+        if (node == null) {
+            log.debug("Give up sending metadata request since no node is available");
+            // mark the timestamp for no node available to connect
+            this.lastNoNodeAvailableMs = now;
+            return;
+        }
+        String nodeConnectionId = node.idString();
+
+
+        if (connectionStates.isConnected(nodeConnectionId) && inFlightRequests.canSendMore(nodeConnectionId)) {
+            Set<String> topics = metadata.topics();
+            this.metadataFetchInProgress = true;
+            ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics);
+            log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
+            this.selector.send(metadataRequest.request());
+            this.inFlightRequests.add(metadataRequest);
+        } else if (connectionStates.canConnect(nodeConnectionId, now)) {
+            // we don't have a connection to this node right now, make one
+            log.debug("Initialize connection to node {} for sending metadata request", node.id());
+            initiateConnect(node, now);
+            // If initiateConnect failed immediately, this node will be put into blackout and we
+            // should allow immediately retrying in case there is another candidate node. If it
+            // is still connecting, the worst case is that we end up setting a longer timeout
+            // on the next round and then wait for the response.
+        } else { // connected, but can't send more OR connecting
+            // In either case, we just need to wait for a network event to let us know the selected
+            // connection might be usable again.
+            this.lastNoNodeAvailableMs = now;
+        }
+    }
+
+    /**
+     * Initiate a connection to the given node
+     */
+    private void initiateConnect(Node node, long now) {
+        String nodeConnectionId = node.idString();
+        try {
+            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
+            this.connectionStates.connecting(nodeConnectionId, now);
+            selector.connect(nodeConnectionId,
+                             new InetSocketAddress(node.host(), node.port()),
+                             this.socketSendBuffer,
+                             this.socketReceiveBuffer);
+        } catch (IOException e) {
+            /* attempt failed, we'll try again after the backoff */
+            connectionStates.disconnected(nodeConnectionId);
+            /* maybe the problem is our metadata, update it */
+            metadata.requestUpdate();
+            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/RequestCompletionHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/RequestCompletionHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/RequestCompletionHandler.java
new file mode 100644
index 0000000..a275e96
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/RequestCompletionHandler.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A callback interface for attaching an action to be executed when a request is complete and the corresponding response
+ * has been received. This handler will also be invoked if there is a disconnection while handling the request.
+ */
+public interface RequestCompletionHandler {
+
+    public void onComplete(ClientResponse response);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/CommitType.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/CommitType.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/CommitType.java
new file mode 100644
index 0000000..d261bd6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/CommitType.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public enum CommitType {
+    SYNC, ASYNC
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/Consumer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/Consumer.java
new file mode 100644
index 0000000..12b48ec
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/Consumer.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+import org.apache.flink.kafka_backport.common.Metric;
+import org.apache.flink.kafka_backport.common.MetricName;
+import org.apache.flink.kafka_backport.common.PartitionInfo;
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * @see KafkaConsumer
+ * @see MockConsumer
+ */
+public interface Consumer<K, V> extends Closeable {
+    
+    /**
+     * @see KafkaConsumer#subscriptions()
+     */
+    public Set<TopicPartition> subscriptions();
+
+    /**
+     * @see KafkaConsumer#subscribe(String...)
+     */
+    public void subscribe(String... topics);
+
+    /**
+     * @see KafkaConsumer#subscribe(TopicPartition...)
+     */
+    public void subscribe(TopicPartition... partitions);
+
+    /**
+     * @see KafkaConsumer#unsubscribe(String...)
+     */
+    public void unsubscribe(String... topics);
+
+    /**
+     * @see KafkaConsumer#unsubscribe(TopicPartition...)
+     */
+    public void unsubscribe(TopicPartition... partitions);
+
+    /**
+     * @see KafkaConsumer#poll(long)
+     */
+    public ConsumerRecords<K, V> poll(long timeout);
+
+    /**
+     * @see KafkaConsumer#commit(CommitType)
+     */
+    public void commit(CommitType commitType);
+
+    /**
+     * @see KafkaConsumer#commit(CommitType, ConsumerCommitCallback)
+     */
+    public void commit(CommitType commitType, ConsumerCommitCallback callback);
+
+    /**
+     * @see KafkaConsumer#commit(Map, CommitType)
+     */
+    public void commit(Map<TopicPartition, Long> offsets, CommitType commitType);
+
+    /**
+     * @see KafkaConsumer#commit(Map, CommitType, ConsumerCommitCallback)
+     */
+    public void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback);
+
+    /**
+     * @see KafkaConsumer#seek(TopicPartition, long)
+     */
+    public void seek(TopicPartition partition, long offset);
+
+    /**
+     * @see KafkaConsumer#seekToBeginning(TopicPartition...)
+     */
+    public void seekToBeginning(TopicPartition... partitions);
+
+    /**
+     * @see KafkaConsumer#seekToEnd(TopicPartition...)
+     */
+    public void seekToEnd(TopicPartition... partitions);
+
+    /**
+     * @see KafkaConsumer#position(TopicPartition)
+     */
+    public long position(TopicPartition partition);
+
+    /**
+     * @see KafkaConsumer#committed(TopicPartition)
+     */
+    public long committed(TopicPartition partition);
+
+    /**
+     * @see KafkaConsumer#metrics()
+     */
+    public Map<MetricName, ? extends Metric> metrics();
+
+    /**
+     * @see KafkaConsumer#partitionsFor(String)
+     */
+    public List<PartitionInfo> partitionsFor(String topic);
+
+    /**
+     * @see KafkaConsumer#close()
+     */
+    public void close();
+
+    /**
+     * @see KafkaConsumer#wakeup()
+     */
+    public void wakeup();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerCommitCallback.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerCommitCallback.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerCommitCallback.java
new file mode 100644
index 0000000..4d90bfc
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerCommitCallback.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback
+ * may be executed in any thread calling {@link Consumer#poll(long) poll()}.
+ */
+public interface ConsumerCommitCallback {
+
+    /**
+     * A callback method the user can implement to provide asynchronous handling of commit request completion.
+     * This method will be called when the commit request sent to the server has been acknowledged.
+     *
+     * @param offsets A map of the offsets that this callback applies to
+     * @param exception The exception thrown during processing of the request, or null if the commit completed successfully
+     */
+    void onComplete(Map<TopicPartition, Long> offsets, Exception exception);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerConfig.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerConfig.java
new file mode 100644
index 0000000..c1f79bf
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerConfig.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+import org.apache.flink.kafka_backport.clients.CommonClientConfigs;
+import org.apache.flink.kafka_backport.clients.consumer.internals.NoOpConsumerRebalanceCallback;
+import org.apache.flink.kafka_backport.common.config.AbstractConfig;
+import org.apache.flink.kafka_backport.common.config.ConfigDef;
+import org.apache.flink.kafka_backport.common.config.ConfigDef.Importance;
+import org.apache.flink.kafka_backport.common.config.ConfigDef.Type;
+import org.apache.flink.kafka_backport.common.serialization.Deserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.kafka_backport.common.config.ConfigDef.Range.atLeast;
+import static org.apache.flink.kafka_backport.common.config.ConfigDef.ValidString.in;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * The consumer configuration keys
+ */
+public class ConsumerConfig extends AbstractConfig {
+    private static final ConfigDef CONFIG;
+
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
+     * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    /**
+     * <code>group.id</code>
+     */
+    public static final String GROUP_ID_CONFIG = "group.id";
+    private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
+
+    /**
+     * <code>session.timeout.ms</code>
+     */
+    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
+    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
+
+    /**
+     * <code>bootstrap.servers</code>
+     */
+    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+
+    /**
+     * <code>enable.auto.commit</code>
+     */
+    public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
+    private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";
+
+    /**
+     * <code>auto.commit.interval.ms</code>
+     */
+    public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
+    private static final String AUTO_COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if <code>enable.auto.commit</code> is set to <code>true</code>.";
+
+    /**
+     * <code>partition.assignment.strategy</code>
+     */
+    public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
+    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The friendly name of the partition assignment strategy that the server will use to distribute partition ownership amongst consumer instances when group management is used";
+
+    /**
+     * <code>auto.offset.reset</code>
+     */
+    public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
+    private static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>smallest: automatically reset the offset to the smallest offset<li>largest: automatically reset the offset to the largest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul>";
+
+    /**
+     * <code>fetch.min.bytes</code>
+     */
+    public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
+    private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.";
+
+    /**
+     * <code>fetch.max.wait.ms</code>
+     */
+    public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
+    private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
+
+    /** <code>metadata.max.age.ms</code> */
+    public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+
+    /**
+     * <code>max.partition.fetch.bytes</code>
+     */
+    public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
+    private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.";
+
+    /** <code>send.buffer.bytes</code> */
+    public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
+
+    /** <code>receive.buffer.bytes</code> */
+    public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
+
+    /**
+     * <code>client.id</code>
+     */
+    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
+
+    /**
+     * <code>reconnect.backoff.ms</code>
+     */
+    public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
+
+    /**
+     * <code>retry.backoff.ms</code>
+     */
+    public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
+
+    /**
+     * <code>metrics.sample.window.ms</code>
+     */
+    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
+
+    /**
+     * <code>metrics.num.samples</code>
+     */
+    public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+
+    /**
+     * <code>metric.reporters</code>
+     */
+    public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
+
+    /**
+     * <code>rebalance.callback.class</code>
+     */
+    public static final String CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG = "rebalance.callback.class";
+    private static final String CONSUMER_REBALANCE_CALLBACK_CLASS_DOC = "A user-provided callback to execute when partition assignments change.";
+
+    /**
+     * <code>check.crcs</code>
+     */
+    public static final String CHECK_CRCS_CONFIG = "check.crcs";
+    private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";
+    
+    /** <code>key.deserializer</code> */
+    public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
+    private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
+
+    /** <code>value.deserializer</code> */
+    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
+    private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
+
+    /** <code>connections.max.idle.ms</code> */
+    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
+
+
+    static {
+        CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
+                                        Type.LIST,
+                                        Importance.HIGH,
+                                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
+                                .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
+                                .define(SESSION_TIMEOUT_MS_CONFIG,
+                                        Type.INT,
+                                        30000,
+                                        Importance.HIGH,
+                                        SESSION_TIMEOUT_MS_DOC)
+                                .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+                                        Type.STRING,
+                                        "range",
+                                        in("range", "roundrobin"),
+                                        Importance.MEDIUM,
+                                        PARTITION_ASSIGNMENT_STRATEGY_DOC)
+                                .define(METADATA_MAX_AGE_CONFIG,
+                                        Type.LONG,
+                                        5 * 60 * 1000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METADATA_MAX_AGE_DOC)
+                                .define(ENABLE_AUTO_COMMIT_CONFIG,
+                                        Type.BOOLEAN,
+                                        true,
+                                        Importance.MEDIUM,
+                                        ENABLE_AUTO_COMMIT_DOC)
+                                .define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
+                                        Type.LONG,
+                                        5000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        AUTO_COMMIT_INTERVAL_MS_DOC)
+                                .define(CLIENT_ID_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.LOW,
+                                        CommonClientConfigs.CLIENT_ID_DOC)
+                                .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
+                                        Type.INT,
+                                        1 * 1024 * 1024,
+                                        atLeast(0),
+                                        Importance.HIGH,
+                                        MAX_PARTITION_FETCH_BYTES_DOC)
+                                .define(SEND_BUFFER_CONFIG,
+                                        Type.INT,
+                                        128 * 1024,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.SEND_BUFFER_DOC)
+                                .define(RECEIVE_BUFFER_CONFIG,
+                                        Type.INT,
+                                        32 * 1024,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.RECEIVE_BUFFER_DOC)
+                                .define(FETCH_MIN_BYTES_CONFIG,
+                                        Type.INT,
+                                        1024,
+                                        atLeast(0),
+                                        Importance.HIGH,
+                                        FETCH_MIN_BYTES_DOC)
+                                .define(FETCH_MAX_WAIT_MS_CONFIG,
+                                        Type.INT,
+                                        500,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        FETCH_MAX_WAIT_MS_DOC)
+                                .define(RECONNECT_BACKOFF_MS_CONFIG,
+                                        Type.LONG,
+                                        50L,
+                                        atLeast(0L),
+                                        Importance.LOW,
+                                        CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+                                .define(RETRY_BACKOFF_MS_CONFIG,
+                                        Type.LONG,
+                                        100L,
+                                        atLeast(0L),
+                                        Importance.LOW,
+                                        CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                                .define(AUTO_OFFSET_RESET_CONFIG,
+                                        Type.STRING,
+                                        "latest",
+                                        in("latest", "earliest", "none"),
+                                        Importance.MEDIUM,
+                                        AUTO_OFFSET_RESET_DOC)
+                                .define(CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        NoOpConsumerRebalanceCallback.class,
+                                        Importance.LOW,
+                                        CONSUMER_REBALANCE_CALLBACK_CLASS_DOC)
+                                .define(CHECK_CRCS_CONFIG,
+                                        Type.BOOLEAN,
+                                        true,
+                                        Importance.LOW,
+                                        CHECK_CRCS_DOC)                                
+                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                                        Type.LONG,
+                                        30000,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                                .define(METRICS_NUM_SAMPLES_CONFIG,
+                                        Type.INT,
+                                        2,
+                                        atLeast(1),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                                .define(METRIC_REPORTER_CLASSES_CONFIG,
+                                        Type.LIST,
+                                        "",
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                                .define(KEY_DESERIALIZER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        KEY_DESERIALIZER_CLASS_DOC)
+                                .define(VALUE_DESERIALIZER_CLASS_CONFIG,
+                                        Type.CLASS,
+                                        Importance.HIGH,
+                                        VALUE_DESERIALIZER_CLASS_DOC)
+                                /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
+                                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                                        Type.LONG,
+                                        9 * 60 * 1000,
+                                        Importance.MEDIUM,
+                                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC);
+    }
+
+    public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
+                                                              Deserializer<?> keyDeserializer,
+                                                              Deserializer<?> valueDeserializer) {
+        Map<String, Object> newConfigs = new HashMap<String, Object>();
+        newConfigs.putAll(configs);
+        if (keyDeserializer != null)
+            newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
+        if (keyDeserializer != null)
+            newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
+        return newConfigs;
+    }
+
+    public static Properties addDeserializerToConfig(Properties properties,
+                                                     Deserializer<?> keyDeserializer,
+                                                     Deserializer<?> valueDeserializer) {
+        Properties newProperties = new Properties();
+        newProperties.putAll(properties);
+        if (keyDeserializer != null)
+            newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
+        if (keyDeserializer != null)
+            newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
+        return newProperties;
+    }
+
+    public ConsumerConfig(Map<?, ?> props) {
+        super(CONFIG, props);
+    }
+
+    public static void main(String[] args) {
+        System.out.println(CONFIG.toHtmlTable());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRebalanceCallback.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRebalanceCallback.java
new file mode 100644
index 0000000..8f2cd75
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRebalanceCallback.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+
+import java.util.Collection;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the
+ * consumer changes.
+ * <p>
+ * This is applicable when the consumer is having Kafka auto-manage group membership, if the consumer's directly subscribe to partitions
+ * those partitions will never be reassigned and this callback is not applicable.
+ * <p>
+ * When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group changes or the subscription
+ * of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
+ * <p>
+ * There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
+ * the {@link #onPartitionsRevoked(Consumer, Collection)} call we can ensure that any time partition assignment changes
+ * the offset gets saved.
+ * <p>
+ * Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
+ * consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the
+ * number of page views per users for each five minute window. Let's say the topic is partitioned by the user id so that
+ * all events for a particular user will go to a single consumer instance. The consumer can keep in memory a running
+ * tally of actions per user and only flush these out to a remote data store when it's cache gets to big. However if a
+ * partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
+ * consumption.
+ * <p>
+ * This callback will execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes.
+ * <p>
+ * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} prior to 
+ * any process invoking {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned}. So if offsets or other state is saved in the 
+ * {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
+ * partition has their {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned} callback called to load the state.
+ * <p>
+ * Here is pseudo-code for a callback implementation for saving offsets:
+ * <pre>
+ * {@code
+ *   public class SaveOffsetsOnRebalance implements ConsumerRebalanceCallback {
+ *       public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+ *           // read the offsets from an external store using some custom code not described here
+ *           for(TopicPartition partition: partitions)
+ *              consumer.position(partition, readOffsetFromExternalStore(partition));
+ *       }      
+ *       public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
+ *           // save the offsets in an external store using some custom code not described here
+ *           for(TopicPartition partition: partitions)
+ *              saveOffsetInExternalStore(consumer.position(partition));
+ *       }
+ *   }
+ * }
+ * </pre>
+ */
+public interface ConsumerRebalanceCallback {
+
+    /**
+     * A callback method the user can implement to provide handling of customized offsets on completion of a successful
+     * partition re-assignement. This method will be called after an offset re-assignement completes and before the
+     * consumer starts fetching data.
+     * <p>
+     * It is guaranteed that all the processes in a consumer group will execute their
+     * {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
+     * {@link #onPartitionsAssigned(Consumer, Collection)} callback.
+     *
+     * @param consumer Reference to the consumer for convenience
+     * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
+     *            assigned to the consumer)
+     */
+    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
+
+    /**
+     * A callback method the user can implement to provide handling of offset commits to a customized store on the start
+     * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
+     * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a
+     * custom offset store to prevent duplicate data
+     * <p>
+     * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
+     *
+     * @param consumer  Reference to the consumer for convenience
+     * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
+     */
+    public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRecord.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRecord.java
new file mode 100644
index 0000000..44096ee
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRecord.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the
+ * record is being received and an offset that points to the record in a Kafka partition.
+ */
+public final class ConsumerRecord<K, V> {
+    private final String topic;
+    private final int partition;
+    private final long offset;
+    private final K key;
+    private final V value;
+
+    /**
+     * Create a record with no key
+     * 
+     * @param topic The topic this record is received from
+     * @param partition The partition of the topic this record is received from
+     * @param offset The offset of this record in the corresponding Kafka partition
+     * @param value The record contents
+     */
+    public ConsumerRecord(String topic, int partition, long offset, K key, V value) {
+        if (topic == null)
+            throw new IllegalArgumentException("Topic cannot be null");
+        this.topic = topic;
+        this.partition = partition;
+        this.offset = offset;
+        this.key = key;
+        this.value = value;
+    }
+
+    /**
+     * The topic this record is received from
+     */
+    public String topic() {
+        return this.topic;
+    }
+
+    /**
+     * The partition from which this record is received
+     */
+    public int partition() {
+        return this.partition;
+    }
+
+    /**
+     * The key (or null if no key is specified)
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * The value
+     */
+    public V value() {
+        return value;
+    }
+
+    /**
+     * The position of this record in the corresponding Kafka partition.
+     */
+    public long offset() {
+        return offset;
+    }
+
+    @Override
+    public String toString() {
+        return "ConsumerRecord(topic = " + topic() + ", partition = " + partition() + ", offset = " + offset()
+                + ", key = " + key + ", value = " + value + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRecords.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRecords.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRecords.java
new file mode 100644
index 0000000..a9cfc84
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerRecords.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+import org.apache.flink.kafka_backport.common.TopicPartition;
+import org.apache.flink.kafka_backport.common.utils.AbstractIterator;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+/**
+ * A container that holds the list {@link ConsumerRecord} per partition for a
+ * particular topic. There is one for every topic returned by a
+ * {@link Consumer#poll(long)} operation.
+ */
+public class ConsumerRecords<K, V> implements Iterable<ConsumerRecord<K, V>> {
+    public static final ConsumerRecords<Object, Object> EMPTY =
+            new ConsumerRecords<Object, Object>(Collections.EMPTY_MAP);
+
+    private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
+
+    public ConsumerRecords(Map<TopicPartition, List<ConsumerRecord<K, V>>> records) {
+        this.records = records;
+    }
+
+    /**
+     * Get just the records for the given partition
+     * 
+     * @param partition The partition to get records for
+     */
+    public Iterable<ConsumerRecord<K, V>> records(TopicPartition partition) {
+        List<ConsumerRecord<K, V>> recs = this.records.get(partition);
+        if (recs == null)
+            return Collections.emptyList();
+        else
+            return recs;
+    }
+
+    /**
+     * Get just the records for the given topic
+     */
+    public Iterable<ConsumerRecord<K, V>> records(String topic) {
+        if (topic == null)
+            throw new IllegalArgumentException("Topic must be non-null.");
+        List<List<ConsumerRecord<K, V>>> recs = new ArrayList<List<ConsumerRecord<K, V>>>();
+        for (Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry : records.entrySet()) {
+            if (entry.getKey().topic().equals(topic))
+                recs.add(entry.getValue());
+        }
+        return new ConcatenatedIterable<K, V>(recs);
+    }
+
+    @Override
+    public Iterator<ConsumerRecord<K, V>> iterator() {
+        return new ConcatenatedIterable<K, V>(records.values()).iterator();
+    }
+    
+    /**
+     * The number of records for all topics
+     */
+    public int count() {
+        int count = 0;
+        for (List<ConsumerRecord<K, V>> recs: this.records.values())
+            count += recs.size();
+        return count;
+    }
+
+    private static class ConcatenatedIterable<K, V> implements Iterable<ConsumerRecord<K, V>> {
+
+        private final Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables;
+
+        public ConcatenatedIterable(Iterable<? extends Iterable<ConsumerRecord<K, V>>> iterables) {
+            this.iterables = iterables;
+        }
+
+        @Override
+        public Iterator<ConsumerRecord<K, V>> iterator() {
+            return new AbstractIterator<ConsumerRecord<K, V>>() {
+                Iterator<? extends Iterable<ConsumerRecord<K, V>>> iters = iterables.iterator();
+                Iterator<ConsumerRecord<K, V>> current;
+
+                public ConsumerRecord<K, V> makeNext() {
+                    if (current == null || !current.hasNext()) {
+                        if (iters.hasNext())
+                            current = iters.next().iterator();
+                        else
+                            return allDone();
+                    }
+                    return current.next();
+                }
+            };
+        }
+    }
+
+    public boolean isEmpty() {
+        return records.isEmpty();
+    }
+
+    @SuppressWarnings("unchecked")
+    public static <K, V> ConsumerRecords<K, V> empty() {
+        return (ConsumerRecords<K, V>) EMPTY;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerWakeupException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerWakeupException.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerWakeupException.java
new file mode 100644
index 0000000..7185c87
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/clients/consumer/ConsumerWakeupException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.flink.kafka_backport.clients.consumer;
+
+import org.apache.flink.kafka_backport.common.KafkaException;
+
+// ----------------------------------------------------------------------------
+//  This class is copied from the Apache Kafka project.
+// 
+//  The class is part of a "backport" of the new consumer API, in order to
+//  give Flink access to its functionality until the API is properly released.
+// 
+//  This is a temporary workaround!
+// ----------------------------------------------------------------------------
+
+public class ConsumerWakeupException extends KafkaException {
+    private static final long serialVersionUID = 1L;
+
+}


Mime
View raw message