flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [11/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Add comments to all backported kafka sources and move them to 'org.apache.flink.kafka_backport'
Date Thu, 27 Aug 2015 11:25:28 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/CommonClientConfigs.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/CommonClientConfigs.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/CommonClientConfigs.java
deleted file mode 100644
index da44734..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/CommonClientConfigs.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.copied.clients;
-
-/**
- * Some configurations shared by both producer and consumer
- */
-public class CommonClientConfigs {
-    
-    /*
-     * NOTE: DO NOT CHANGE EITHER CONFIG NAMES AS THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
-     */
-
-    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
-    public static final String BOOSTRAP_SERVERS_DOC = "A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form "
-                                                       + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the initial connection to "
-                                                       + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of "
-                                                       + "servers (you may want more than one, though, in case a server is down).";
-    
-    public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
-    public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
-    
-    public static final String SEND_BUFFER_CONFIG = "send.buffer.bytes";
-    public static final String SEND_BUFFER_DOC = "The size of the TCP send buffer (SO_SNDBUF) to use when sending data.";
-
-    public static final String RECEIVE_BUFFER_CONFIG = "receive.buffer.bytes";
-    public static final String RECEIVE_BUFFER_DOC = "The size of the TCP receive buffer (SO_RCVBUF) to use when reading data.";
-
-    public static final String CLIENT_ID_CONFIG = "client.id";
-    public static final String CLIENT_ID_DOC = "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.";
-
-    public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
-    public static final String RECONNECT_BACKOFF_MS_DOC = "The amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all requests sent by the consumer to the broker.";
-
-    public static final String RETRY_BACKOFF_MS_CONFIG = "retry.backoff.ms";
-    public static final String RETRY_BACKOFF_MS_DOC = "The amount of time to wait before attempting to retry a failed fetch request to a given topic partition. This avoids repeated fetching-and-failing in a tight loop.";
-    
-    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = "metrics.sample.window.ms";
-    public static final String METRICS_SAMPLE_WINDOW_MS_DOC = "The number of samples maintained to compute metrics.";
-
-    public static final String METRICS_NUM_SAMPLES_CONFIG = "metrics.num.samples";
-    public static final String METRICS_NUM_SAMPLES_DOC = "The number of samples maintained to compute metrics.";
-
-    public static final String METRIC_REPORTER_CLASSES_CONFIG = "metric.reporters";
-    public static final String METRIC_REPORTER_CLASSES_DOC = "A list of classes to use as metrics reporters. Implementing the <code>MetricReporter</code> interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.";
-
-    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = "connections.max.idle.ms";
-    public static final String CONNECTIONS_MAX_IDLE_MS_DOC = "Close idle connections after the number of milliseconds specified by this config.";
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/ConnectionState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/ConnectionState.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/ConnectionState.java
deleted file mode 100644
index 6ff294b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/ConnectionState.java
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients;
-
-/**
- * The states of a node connection
- */
-public enum ConnectionState {
-    DISCONNECTED, CONNECTING, CONNECTED
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/InFlightRequests.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/InFlightRequests.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/InFlightRequests.java
deleted file mode 100644
index cb3d38d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/InFlightRequests.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients;
-
-import java.util.ArrayDeque;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * The set of requests which have been sent or are being sent but haven't yet received a response
- */
-final class InFlightRequests {
-
-    private final int maxInFlightRequestsPerConnection;
-    private final Map<String, Deque<ClientRequest>> requests = new HashMap<String, Deque<ClientRequest>>();
-
-    public InFlightRequests(int maxInFlightRequestsPerConnection) {
-        this.maxInFlightRequestsPerConnection = maxInFlightRequestsPerConnection;
-    }
-
-    /**
-     * Add the given request to the queue for the connection it was directed to
-     */
-    public void add(ClientRequest request) {
-        Deque<ClientRequest> reqs = this.requests.get(request.request().destination());
-        if (reqs == null) {
-            reqs = new ArrayDeque<ClientRequest>();
-            this.requests.put(request.request().destination(), reqs);
-        }
-        reqs.addFirst(request);
-    }
-
-    /**
-     * Get the request queue for the given node
-     */
-    private Deque<ClientRequest> requestQueue(String node) {
-        Deque<ClientRequest> reqs = requests.get(node);
-        if (reqs == null || reqs.isEmpty())
-            throw new IllegalStateException("Response from server for which there are no in-flight requests.");
-        return reqs;
-    }
-
-    /**
-     * Get the oldest request (the one that that will be completed next) for the given node
-     */
-    public ClientRequest completeNext(String node) {
-        return requestQueue(node).pollLast();
-    }
-
-    /**
-     * Get the last request we sent to the given node (but don't remove it from the queue)
-     * @param node The node id
-     */
-    public ClientRequest lastSent(String node) {
-        return requestQueue(node).peekFirst();
-    }
-
-    /**
-     * Complete the last request that was sent to a particular node.
-     * @param node The node the request was sent to
-     * @return The request
-     */
-    public ClientRequest completeLastSent(String node) {
-        return requestQueue(node).pollFirst();
-    }
-
-    /**
-     * Can we send more requests to this node?
-     * 
-     * @param node Node in question
-     * @return true iff we have no requests still being sent to the given node
-     */
-    public boolean canSendMore(String node) {
-        Deque<ClientRequest> queue = requests.get(node);
-        return queue == null || queue.isEmpty() ||
-               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
-    }
-
-    /**
-     * Return the number of inflight requests directed at the given node
-     * @param node The node
-     * @return The request count.
-     */
-    public int inFlightRequestCount(String node) {
-        Deque<ClientRequest> queue = requests.get(node);
-        return queue == null ? 0 : queue.size();
-    }
-
-    /**
-     * Count all in-flight requests for all nodes
-     */
-    public int inFlightRequestCount() {
-        int total = 0;
-        for (Deque<ClientRequest> deque : this.requests.values())
-            total += deque.size();
-        return total;
-    }
-
-    /**
-     * Clear out all the in-flight requests for the given node and return them
-     * 
-     * @param node The node
-     * @return All the in-flight requests for that node that have been removed
-     */
-    public Iterable<ClientRequest> clearAll(String node) {
-        Deque<ClientRequest> reqs = requests.get(node);
-        if (reqs == null) {
-            return Collections.emptyList();
-        } else {
-            return requests.remove(node);
-        }
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/KafkaClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/KafkaClient.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/KafkaClient.java
deleted file mode 100644
index 9c53d0d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/KafkaClient.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients;
-
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.requests.RequestHeader;
-
-import java.io.Closeable;
-import java.util.List;
-
-/**
- * The interface for {@link NetworkClient}
- */
-public interface KafkaClient extends Closeable {
-
-    /**
-     * Check if we are currently ready to send another request to the given node but don't attempt to connect if we
-     * aren't.
-     * 
-     * @param node The node to check
-     * @param now The current timestamp
-     */
-    public boolean isReady(Node node, long now);
-
-    /**
-     * Initiate a connection to the given node (if necessary), and return true if already connected. The readiness of a
-     * node will change only when poll is invoked.
-     * 
-     * @param node The node to connect to.
-     * @param now The current time
-     * @return true iff we are ready to immediately initiate the sending of another request to the given node.
-     */
-    public boolean ready(Node node, long now);
-
-    /**
-     * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
-     * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
-     * connections.
-     * 
-     * @param node The node to check
-     * @param now The current timestamp
-     * @return The number of milliseconds to wait.
-     */
-    public long connectionDelay(Node node, long now);
-
-    /**
-     * Check if the connection of the node has failed, based on the connection state. Such connection failure are
-     * usually transient and can be resumed in the next {@link #ready(Node, long)} }
-     * call, but there are cases where transient failures needs to be caught and re-acted upon.
-     *
-     * @param node the node to check
-     * @return true iff the connection has failed and the node is disconnected
-     */
-    public boolean connectionFailed(Node node);
-
-    /**
-     * Queue up the given request for sending. Requests can only be sent on ready connections.
-     * 
-     * @param request The request
-     */
-    public void send(ClientRequest request);
-
-    /**
-     * Do actual reads and writes from sockets.
-     * 
-     * @param timeout The maximum amount of time to wait for responses in ms
-     * @param now The current time in ms
-     * @throws IllegalStateException If a request is sent to an unready node
-     */
-    public List<ClientResponse> poll(long timeout, long now);
-
-    /**
-     * Complete all in-flight requests for a given connection
-     * 
-     * @param id The connection to complete requests for
-     * @param now The current time in ms
-     * @return All requests that complete during this time period.
-     */
-    public List<ClientResponse> completeAll(String id, long now);
-
-    /**
-     * Complete all in-flight requests
-     * 
-     * @param now The current time in ms
-     * @return All requests that complete during this time period.
-     */
-    public List<ClientResponse> completeAll(long now);
-
-    /**
-     * Choose the node with the fewest outstanding requests. This method will prefer a node with an existing connection,
-     * but will potentially choose a node for which we don't yet have a connection if all existing connections are in
-     * use.
-     * 
-     * @param now The current time in ms
-     * @return The node with the fewest in-flight requests.
-     */
-    public Node leastLoadedNode(long now);
-
-    /**
-     * The number of currently in-flight requests for which we have not yet returned a response
-     */
-    public int inFlightRequestCount();
-
-    /**
-     * Get the total in-flight requests for a particular node
-     * 
-     * @param nodeId The id of the node
-     */
-    public int inFlightRequestCount(String nodeId);
-
-    /**
-     * Generate a request header for the next request
-     * 
-     * @param key The API key of the request
-     */
-    public RequestHeader nextRequestHeader(ApiKeys key);
-
-    /**
-     * Wake up the client if it is currently blocked waiting for I/O
-     */
-    public void wakeup();
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/Metadata.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/Metadata.java
deleted file mode 100644
index 5210ce0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/Metadata.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients;
-
-import org.apache.kafka.copied.common.Cluster;
-import org.apache.kafka.copied.common.errors.TimeoutException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * A class encapsulating some of the logic around metadata.
- * <p>
- * This class is shared by the client thread (for partitioning) and the background sender thread.
- * 
- * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a
- * topic we don't have any metadata for it will trigger a metadata update.
- */
-public final class Metadata {
-
-    private static final Logger log = LoggerFactory.getLogger(Metadata.class);
-
-    private final long refreshBackoffMs;
-    private final long metadataExpireMs;
-    private int version;
-    private long lastRefreshMs;
-    private long lastSuccessfulRefreshMs;
-    private Cluster cluster;
-    private boolean needUpdate;
-    private final Set<String> topics;
-
-    /**
-     * Create a metadata instance with reasonable defaults
-     */
-    public Metadata() {
-        this(100L, 60 * 60 * 1000L);
-    }
-
-    /**
-     * Create a new Metadata instance
-     * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy
-     *        polling
-     * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh
-     */
-    public Metadata(long refreshBackoffMs, long metadataExpireMs) {
-        this.refreshBackoffMs = refreshBackoffMs;
-        this.metadataExpireMs = metadataExpireMs;
-        this.lastRefreshMs = 0L;
-        this.lastSuccessfulRefreshMs = 0L;
-        this.version = 0;
-        this.cluster = Cluster.empty();
-        this.needUpdate = false;
-        this.topics = new HashSet<String>();
-    }
-
-    /**
-     * Get the current cluster info without blocking
-     */
-    public synchronized Cluster fetch() {
-        return this.cluster;
-    }
-
-    /**
-     * Add the topic to maintain in the metadata
-     */
-    public synchronized void add(String topic) {
-        topics.add(topic);
-    }
-
-    /**
-     * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
-     * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
-     * is now
-     */
-    public synchronized long timeToNextUpdate(long nowMs) {
-        long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
-        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
-        return Math.max(timeToExpire, timeToAllowUpdate);
-    }
-
-    /**
-     * Request an update of the current cluster metadata info, return the current version before the update
-     */
-    public synchronized int requestUpdate() {
-        this.needUpdate = true;
-        return this.version;
-    }
-
-    /**
-     * Wait for metadata update until the current version is larger than the last version we know of
-     */
-    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
-        if (maxWaitMs < 0) {
-            throw new IllegalArgumentException("Max time to wait for metadata updates should not be < 0 milli seconds");
-        }
-        long begin = System.currentTimeMillis();
-        long remainingWaitMs = maxWaitMs;
-        while (this.version <= lastVersion) {
-            if (remainingWaitMs != 0)
-                wait(remainingWaitMs);
-            long elapsed = System.currentTimeMillis() - begin;
-            if (elapsed >= maxWaitMs)
-                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
-            remainingWaitMs = maxWaitMs - elapsed;
-        }
-    }
-
-    /**
-     * Add one or more topics to maintain metadata for
-     */
-    public synchronized void addTopics(String... topics) {
-        for (String topic : topics)
-            this.topics.add(topic);
-        requestUpdate();
-    }
-
-    /**
-     * Get the list of topics we are currently maintaining metadata for
-     */
-    public synchronized Set<String> topics() {
-        return new HashSet<String>(this.topics);
-    }
-
-    /**
-     * Check if a topic is already in the topic set.
-     * @param topic topic to check
-     * @return true if the topic exists, false otherwise
-     */
-    public synchronized boolean containsTopic(String topic) {
-        return this.topics.contains(topic);
-    }
-
-    /**
-     * Update the cluster metadata
-     */
-    public synchronized void update(Cluster cluster, long now) {
-        this.needUpdate = false;
-        this.lastRefreshMs = now;
-        this.lastSuccessfulRefreshMs = now;
-        this.version += 1;
-        this.cluster = cluster;
-        notifyAll();
-        log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
-    }
-    
-    /**
-     * Record an attempt to update the metadata that failed. We need to keep track of this
-     * to avoid retrying immediately.
-     */
-    public synchronized void failedUpdate(long now) {
-        this.lastRefreshMs = now;
-    }
-    
-    /**
-     * @return The current metadata version
-     */
-    public synchronized int version() {
-        return this.version;
-    }
-
-    /**
-     * The last time metadata was successfully updated.
-     */
-    public synchronized long lastSuccessfulUpdate() {
-        return this.lastSuccessfulRefreshMs;
-    }
-
-    /**
-     * The metadata refresh backoff in ms
-     */
-    public long refreshBackoff() {
-        return refreshBackoffMs;
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/NetworkClient.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/NetworkClient.java
deleted file mode 100644
index 2253ab4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/NetworkClient.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients;
-
-import org.apache.kafka.copied.common.Cluster;
-import org.apache.kafka.copied.common.Node;
-import org.apache.kafka.copied.common.network.NetworkReceive;
-import org.apache.kafka.copied.common.network.Selectable;
-import org.apache.kafka.copied.common.network.Send;
-import org.apache.kafka.copied.common.protocol.ApiKeys;
-import org.apache.kafka.copied.common.protocol.ProtoUtils;
-import org.apache.kafka.copied.common.protocol.types.Struct;
-import org.apache.kafka.copied.common.requests.MetadataRequest;
-import org.apache.kafka.copied.common.requests.MetadataResponse;
-import org.apache.kafka.copied.common.requests.RequestHeader;
-import org.apache.kafka.copied.common.requests.RequestSend;
-import org.apache.kafka.copied.common.requests.ResponseHeader;
-import org.apache.kafka.copied.common.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * A network client for asynchronous request/response network i/o. This is an internal class used to implement the
- * user-facing producer and consumer clients.
- * <p>
- * This class is not thread-safe!
- */
-public class NetworkClient implements KafkaClient {
-
-    private static final Logger log = LoggerFactory.getLogger(NetworkClient.class);
-
-    /* the selector used to perform network i/o */
-    private final Selectable selector;
-
-    /* the current cluster metadata */
-    private final Metadata metadata;
-
-    /* the state of each node's connection */
-    private final ClusterConnectionStates connectionStates;
-
-    /* the set of requests currently being sent or awaiting a response */
-    private final InFlightRequests inFlightRequests;
-
-    /* the socket send buffer size in bytes */
-    private final int socketSendBuffer;
-
-    /* the socket receive size buffer in bytes */
-    private final int socketReceiveBuffer;
-
-    /* the client id used to identify this client in requests to the server */
-    private final String clientId;
-
-    /* a random offset to use when choosing nodes to avoid having all nodes choose the same node */
-    private final int nodeIndexOffset;
-
-    /* the current correlation id to use when sending requests to servers */
-    private int correlation;
-
-    /* true iff there is a metadata request that has been sent and for which we have not yet received a response */
-    private boolean metadataFetchInProgress;
-
-    /* the last timestamp when no broker node is available to connect */
-    private long lastNoNodeAvailableMs;
-
-    public NetworkClient(Selectable selector,
-                         Metadata metadata,
-                         String clientId,
-                         int maxInFlightRequestsPerConnection,
-                         long reconnectBackoffMs,
-                         int socketSendBuffer,
-                         int socketReceiveBuffer) {
-        this.selector = selector;
-        this.metadata = metadata;
-        this.clientId = clientId;
-        this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
-        this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
-        this.socketSendBuffer = socketSendBuffer;
-        this.socketReceiveBuffer = socketReceiveBuffer;
-        this.correlation = 0;
-        this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE);
-        this.metadataFetchInProgress = false;
-        this.lastNoNodeAvailableMs = 0;
-    }
-
-    /**
-     * Begin connecting to the given node, return true if we are already connected and ready to send to that node.
-     * 
-     * @param node The node to check
-     * @param now The current timestamp
-     * @return True if we are ready to send to the given node
-     */
-    @Override
-    public boolean ready(Node node, long now) {
-        if (isReady(node, now))
-            return true;
-
-        if (connectionStates.canConnect(node.idString(), now))
-            // if we are interested in sending to a node and we don't have a connection to it, initiate one
-            initiateConnect(node, now);
-
-        return false;
-    }
-
-    /**
-     * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When
-     * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled
-     * connections.
-     * 
-     * @param node The node to check
-     * @param now The current timestamp
-     * @return The number of milliseconds to wait.
-     */
-    @Override
-    public long connectionDelay(Node node, long now) {
-        return connectionStates.connectionDelay(node.idString(), now);
-    }
-
-    /**
-     * Check if the connection of the node has failed, based on the connection state. Such connection failure are
-     * usually transient and can be resumed in the next {@link #ready(Node, long)} }
-     * call, but there are cases where transient failures needs to be caught and re-acted upon.
-     *
-     * @param node the node to check
-     * @return true iff the connection has failed and the node is disconnected
-     */
-    @Override
-    public boolean connectionFailed(Node node) {
-        return connectionStates.connectionState(node.idString()).equals(ConnectionState.DISCONNECTED);
-    }
-
-    /**
-     * Check if the node with the given id is ready to send more requests.
-     * 
-     * @param node The node
-     * @param now The current time in ms
-     * @return true if the node is ready
-     */
-    @Override
-    public boolean isReady(Node node, long now) {
-        String nodeId = node.idString();
-        if (!this.metadataFetchInProgress && this.metadata.timeToNextUpdate(now) == 0)
-            // if we need to update our metadata now declare all requests unready to make metadata requests first
-            // priority
-            return false;
-        else
-            // otherwise we are ready if we are connected and can send more requests
-            return isSendable(nodeId);
-    }
-
-    /**
-     * Are we connected and ready and able to send more requests to the given connection?
-     * 
-     * @param node The node
-     */
-    private boolean isSendable(String node) {
-        return connectionStates.isConnected(node) && inFlightRequests.canSendMore(node);
-    }
-
-    /**
-     * Return the state of the connection to the given node
-     * 
-     * @param node The node to check
-     * @return The connection state
-     */
-    public ConnectionState connectionState(String node) {
-        return connectionStates.connectionState(node);
-    }
-
-    /**
-     * Queue up the given request for sending. Requests can only be sent out to ready nodes.
-     * 
-     * @param request The request
-     */
-    @Override
-    public void send(ClientRequest request) {
-        String nodeId = request.request().destination();
-        if (!isSendable(nodeId))
-            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
-
-        this.inFlightRequests.add(request);
-        selector.send(request.request());
-    }
-
-    /**
-     * Do actual reads and writes to sockets.
-     * 
-     * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately
-     * @param now The current time in milliseconds
-     * @return The list of responses received
-     */
-    @Override
-    public List<ClientResponse> poll(long timeout, long now) {
-        // should we update our metadata?
-        long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
-        long timeToNextReconnectAttempt = Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
-        long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
-        // if there is no node available to connect, back off refreshing metadata
-        long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
-                waitForMetadataFetch);
-        if (metadataTimeout == 0)
-            maybeUpdateMetadata(now);
-        // do the I/O
-        try {
-            this.selector.poll(Math.min(timeout, metadataTimeout));
-        } catch (IOException e) {
-            log.error("Unexpected error during I/O in producer network thread", e);
-        }
-
-        // process completed actions
-        List<ClientResponse> responses = new ArrayList<ClientResponse>();
-        handleCompletedSends(responses, now);
-        handleCompletedReceives(responses, now);
-        handleDisconnections(responses, now);
-        handleConnections();
-
-        // invoke callbacks
-        for (ClientResponse response : responses) {
-            if (response.request().hasCallback()) {
-                try {
-                    response.request().callback().onComplete(response);
-                } catch (Exception e) {
-                    log.error("Uncaught error in request completion:", e);
-                }
-            }
-        }
-
-        return responses;
-    }
-
-    /**
-     * Await all the outstanding responses for requests on the given connection
-     * 
-     * @param node The node to block on
-     * @param now The current time in ms
-     * @return All the collected responses
-     */
-    @Override
-    public List<ClientResponse> completeAll(String node, long now) {
-        try {
-            this.selector.muteAll();
-            this.selector.unmute(node);
-            List<ClientResponse> responses = new ArrayList<ClientResponse>();
-            while (inFlightRequestCount(node) > 0)
-                responses.addAll(poll(Integer.MAX_VALUE, now));
-            return responses;
-        } finally {
-            this.selector.unmuteAll();
-        }
-    }
-
-    /**
-     * Wait for all outstanding requests to complete.
-     */
-    @Override
-    public List<ClientResponse> completeAll(long now) {
-        List<ClientResponse> responses = new ArrayList<ClientResponse>();
-        while (inFlightRequestCount() > 0)
-            responses.addAll(poll(Integer.MAX_VALUE, now));
-        return responses;
-    }
-
-    /**
-     * Get the number of in-flight requests
-     */
-    @Override
-    public int inFlightRequestCount() {
-        return this.inFlightRequests.inFlightRequestCount();
-    }
-
-    /**
-     * Get the number of in-flight requests for a given node
-     */
-    @Override
-    public int inFlightRequestCount(String node) {
-        return this.inFlightRequests.inFlightRequestCount(node);
-    }
-
-    /**
-     * Generate a request header for the given API key
-     * 
-     * @param key The api key
-     * @return A request header with the appropriate client id and correlation id
-     */
-    @Override
-    public RequestHeader nextRequestHeader(ApiKeys key) {
-        return new RequestHeader(key.id, clientId, correlation++);
-    }
-
-    /**
-     * Interrupt the client if it is blocked waiting on I/O.
-     */
-    @Override
-    public void wakeup() {
-        this.selector.wakeup();
-    }
-
-    /**
-     * Close the network client
-     */
-    @Override
-    public void close() {
-        this.selector.close();
-    }
-
-    /**
-     * Choose the node with the fewest outstanding requests which is at least eligible for connection. This method will
-     * prefer a node with an existing connection, but will potentially choose a node for which we don't yet have a
-     * connection if all existing connections are in use. This method will never choose a node for which there is no
-     * existing connection and from which we have disconnected within the reconnect backoff period.
-     * 
-     * @return The node with the fewest in-flight requests.
-     */
-    public Node leastLoadedNode(long now) {
-        List<Node> nodes = this.metadata.fetch().nodes();
-        int inflight = Integer.MAX_VALUE;
-        Node found = null;
-        for (int i = 0; i < nodes.size(); i++) {
-            int idx = Utils.abs((this.nodeIndexOffset + i) % nodes.size());
-            Node node = nodes.get(idx);
-            int currInflight = this.inFlightRequests.inFlightRequestCount(node.idString());
-            if (currInflight == 0 && this.connectionStates.isConnected(node.idString())) {
-                // if we find an established connection with no in-flight requests we can stop right away
-                return node;
-            } else if (!this.connectionStates.isBlackedOut(node.idString(), now) && currInflight < inflight) {
-                // otherwise if this is the best we have found so far, record that
-                inflight = currInflight;
-                found = node;
-            }
-        }
-        return found;
-    }
-
-    /**
-     * Handle any completed request send. In particular if no response is expected consider the request complete.
-     * 
-     * @param responses The list of responses to update
-     * @param now The current time
-     */
-    private void handleCompletedSends(List<ClientResponse> responses, long now) {
-        // if no response is expected then when the send is completed, return it
-        for (Send send : this.selector.completedSends()) {
-            ClientRequest request = this.inFlightRequests.lastSent(send.destination());
-            if (!request.expectResponse()) {
-                this.inFlightRequests.completeLastSent(send.destination());
-                responses.add(new ClientResponse(request, now, false, null));
-            }
-        }
-    }
-
-    /**
-     * Handle any completed receives and update the response list with the responses received.
-     * 
-     * @param responses The list of responses to update
-     * @param now The current time
-     */
-    private void handleCompletedReceives(List<ClientResponse> responses, long now) {
-        for (NetworkReceive receive : this.selector.completedReceives()) {
-            String source = receive.source();
-            ClientRequest req = inFlightRequests.completeNext(source);
-            ResponseHeader header = ResponseHeader.parse(receive.payload());
-            short apiKey = req.request().header().apiKey();
-            Struct body = (Struct) ProtoUtils.currentResponseSchema(apiKey).read(receive.payload());
-            correlate(req.request().header(), header);
-            if (apiKey == ApiKeys.METADATA.id) {
-                handleMetadataResponse(req.request().header(), body, now);
-            } else {
-                // need to add body/header to response here
-                responses.add(new ClientResponse(req, now, false, body));
-            }
-        }
-    }
-
-    private void handleMetadataResponse(RequestHeader header, Struct body, long now) {
-        this.metadataFetchInProgress = false;
-        MetadataResponse response = new MetadataResponse(body);
-        Cluster cluster = response.cluster();
-        // check if any topics metadata failed to get updated
-        if (response.errors().size() > 0) {
-            log.warn("Error while fetching metadata with correlation id {} : {}", header.correlationId(), response.errors());
-        }
-        // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being
-        // created which means we will get errors and no nodes until it exists
-        if (cluster.nodes().size() > 0) {
-            this.metadata.update(cluster, now);
-        } else {
-            log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId());
-            this.metadata.failedUpdate(now);
-        }
-    }
-
-    /**
-     * Handle any disconnected connections
-     * 
-     * @param responses The list of responses that completed with the disconnection
-     * @param now The current time
-     */
-    private void handleDisconnections(List<ClientResponse> responses, long now) {
-        for (String node : this.selector.disconnected()) {
-            connectionStates.disconnected(node);
-            log.debug("Node {} disconnected.", node);
-            for (ClientRequest request : this.inFlightRequests.clearAll(node)) {
-                log.trace("Cancelled request {} due to node {} being disconnected", request, node);
-                ApiKeys requestKey = ApiKeys.forId(request.request().header().apiKey());
-                if (requestKey == ApiKeys.METADATA)
-                    metadataFetchInProgress = false;
-                else
-                    responses.add(new ClientResponse(request, now, true, null));
-            }
-        }
-        // we got a disconnect so we should probably refresh our metadata and see if that broker is dead
-        if (this.selector.disconnected().size() > 0)
-            this.metadata.requestUpdate();
-    }
-
-    /**
-     * Record any newly completed connections
-     */
-    private void handleConnections() {
-        for (String node : this.selector.connected()) {
-            log.debug("Completed connection to node {}", node);
-            this.connectionStates.connected(node);
-        }
-    }
-
-    /**
-     * Validate that the response corresponds to the request we expect or else explode
-     */
-    private void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) {
-        if (requestHeader.correlationId() != responseHeader.correlationId())
-            throw new IllegalStateException("Correlation id for response (" + responseHeader.correlationId()
-                    + ") does not match request (" + requestHeader.correlationId() + ")");
-    }
-
-    /**
-     * Create a metadata request for the given topics
-     */
-    private ClientRequest metadataRequest(long now, String node, Set<String> topics) {
-        MetadataRequest metadata = new MetadataRequest(new ArrayList<String>(topics));
-        RequestSend send = new RequestSend(node, nextRequestHeader(ApiKeys.METADATA), metadata.toStruct());
-        return new ClientRequest(now, true, send, null);
-    }
-
-    /**
-     * Add a metadata request to the list of sends if we can make one
-     */
-    private void maybeUpdateMetadata(long now) {
-        // Beware that the behavior of this method and the computation of timeouts for poll() are
-        // highly dependent on the behavior of leastLoadedNode.
-        Node node = this.leastLoadedNode(now);
-        if (node == null) {
-            log.debug("Give up sending metadata request since no node is available");
-            // mark the timestamp for no node available to connect
-            this.lastNoNodeAvailableMs = now;
-            return;
-        }
-        String nodeConnectionId = node.idString();
-
-
-        if (connectionStates.isConnected(nodeConnectionId) && inFlightRequests.canSendMore(nodeConnectionId)) {
-            Set<String> topics = metadata.topics();
-            this.metadataFetchInProgress = true;
-            ClientRequest metadataRequest = metadataRequest(now, nodeConnectionId, topics);
-            log.debug("Sending metadata request {} to node {}", metadataRequest, node.id());
-            this.selector.send(metadataRequest.request());
-            this.inFlightRequests.add(metadataRequest);
-        } else if (connectionStates.canConnect(nodeConnectionId, now)) {
-            // we don't have a connection to this node right now, make one
-            log.debug("Initialize connection to node {} for sending metadata request", node.id());
-            initiateConnect(node, now);
-            // If initiateConnect failed immediately, this node will be put into blackout and we
-            // should allow immediately retrying in case there is another candidate node. If it
-            // is still connecting, the worst case is that we end up setting a longer timeout
-            // on the next round and then wait for the response.
-        } else { // connected, but can't send more OR connecting
-            // In either case, we just need to wait for a network event to let us know the selected
-            // connection might be usable again.
-            this.lastNoNodeAvailableMs = now;
-        }
-    }
-
-    /**
-     * Initiate a connection to the given node
-     */
-    private void initiateConnect(Node node, long now) {
-        String nodeConnectionId = node.idString();
-        try {
-            log.debug("Initiating connection to node {} at {}:{}.", node.id(), node.host(), node.port());
-            this.connectionStates.connecting(nodeConnectionId, now);
-            selector.connect(nodeConnectionId,
-                             new InetSocketAddress(node.host(), node.port()),
-                             this.socketSendBuffer,
-                             this.socketReceiveBuffer);
-        } catch (IOException e) {
-            /* attempt failed, we'll try again after the backoff */
-            connectionStates.disconnected(nodeConnectionId);
-            /* maybe the problem is our metadata, update it */
-            metadata.requestUpdate();
-            log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/RequestCompletionHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/RequestCompletionHandler.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/RequestCompletionHandler.java
deleted file mode 100644
index e698f9f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/RequestCompletionHandler.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients;
-
-/**
- * A callback interface for attaching an action to be executed when a request is complete and the corresponding response
- * has been received. This handler will also be invoked if there is a disconnection while handling the request.
- */
-public interface RequestCompletionHandler {
-
-    public void onComplete(ClientResponse response);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/CommitType.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/CommitType.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/CommitType.java
deleted file mode 100644
index 90a3120..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/CommitType.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients.consumer;
-
-public enum CommitType {
-    SYNC, ASYNC
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/Consumer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/Consumer.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/Consumer.java
deleted file mode 100644
index 3d3e67b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/Consumer.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients.consumer;
-
-import org.apache.kafka.copied.common.Metric;
-import org.apache.kafka.copied.common.MetricName;
-import org.apache.kafka.copied.common.PartitionInfo;
-import org.apache.kafka.copied.common.TopicPartition;
-
-import java.io.Closeable;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * @see KafkaConsumer
- * @see MockConsumer
- */
-public interface Consumer<K, V> extends Closeable {
-    
-    /**
-     * @see KafkaConsumer#subscriptions()
-     */
-    public Set<TopicPartition> subscriptions();
-
-    /**
-     * @see KafkaConsumer#subscribe(String...)
-     */
-    public void subscribe(String... topics);
-
-    /**
-     * @see KafkaConsumer#subscribe(TopicPartition...)
-     */
-    public void subscribe(TopicPartition... partitions);
-
-    /**
-     * @see KafkaConsumer#unsubscribe(String...)
-     */
-    public void unsubscribe(String... topics);
-
-    /**
-     * @see KafkaConsumer#unsubscribe(TopicPartition...)
-     */
-    public void unsubscribe(TopicPartition... partitions);
-
-    /**
-     * @see KafkaConsumer#poll(long)
-     */
-    public ConsumerRecords<K, V> poll(long timeout);
-
-    /**
-     * @see KafkaConsumer#commit(CommitType)
-     */
-    public void commit(CommitType commitType);
-
-    /**
-     * @see KafkaConsumer#commit(CommitType, ConsumerCommitCallback)
-     */
-    public void commit(CommitType commitType, ConsumerCommitCallback callback);
-
-    /**
-     * @see KafkaConsumer#commit(Map, CommitType)
-     */
-    public void commit(Map<TopicPartition, Long> offsets, CommitType commitType);
-
-    /**
-     * @see KafkaConsumer#commit(Map, CommitType, ConsumerCommitCallback)
-     */
-    public void commit(Map<TopicPartition, Long> offsets, CommitType commitType, ConsumerCommitCallback callback);
-
-    /**
-     * @see KafkaConsumer#seek(TopicPartition, long)
-     */
-    public void seek(TopicPartition partition, long offset);
-
-    /**
-     * @see KafkaConsumer#seekToBeginning(TopicPartition...)
-     */
-    public void seekToBeginning(TopicPartition... partitions);
-
-    /**
-     * @see KafkaConsumer#seekToEnd(TopicPartition...)
-     */
-    public void seekToEnd(TopicPartition... partitions);
-
-    /**
-     * @see KafkaConsumer#position(TopicPartition)
-     */
-    public long position(TopicPartition partition);
-
-    /**
-     * @see KafkaConsumer#committed(TopicPartition)
-     */
-    public long committed(TopicPartition partition);
-
-    /**
-     * @see KafkaConsumer#metrics()
-     */
-    public Map<MetricName, ? extends Metric> metrics();
-
-    /**
-     * @see KafkaConsumer#partitionsFor(String)
-     */
-    public List<PartitionInfo> partitionsFor(String topic);
-
-    /**
-     * @see KafkaConsumer#close()
-     */
-    public void close();
-
-    /**
-     * @see KafkaConsumer#wakeup()
-     */
-    public void wakeup();
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerCommitCallback.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerCommitCallback.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerCommitCallback.java
deleted file mode 100644
index aa1460c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerCommitCallback.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients.consumer;
-
-import org.apache.kafka.copied.common.TopicPartition;
-
-import java.util.Map;
-
-/**
- * A callback interface that the user can implement to trigger custom actions when a commit request completes. The callback
- * may be executed in any thread calling {@link Consumer#poll(long) poll()}.
- */
-public interface ConsumerCommitCallback {
-
-    /**
-     * A callback method the user can implement to provide asynchronous handling of commit request completion.
-     * This method will be called when the commit request sent to the server has been acknowledged.
-     *
-     * @param offsets A map of the offsets that this callback applies to
-     * @param exception The exception thrown during processing of the request, or null if the commit completed successfully
-     */
-    void onComplete(Map<TopicPartition, Long> offsets, Exception exception);
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerConfig.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerConfig.java
deleted file mode 100644
index 0d810fe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerConfig.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients.consumer;
-
-import org.apache.kafka.copied.clients.CommonClientConfigs;
-import org.apache.kafka.copied.clients.consumer.internals.NoOpConsumerRebalanceCallback;
-import org.apache.kafka.copied.common.config.AbstractConfig;
-import org.apache.kafka.copied.common.config.ConfigDef;
-import org.apache.kafka.copied.common.config.ConfigDef.Importance;
-import org.apache.kafka.copied.common.config.ConfigDef.Type;
-import org.apache.kafka.copied.common.serialization.Deserializer;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.apache.kafka.copied.common.config.ConfigDef.Range.atLeast;
-import static org.apache.kafka.copied.common.config.ConfigDef.ValidString.in;
-
-/**
- * The consumer configuration keys
- */
-public class ConsumerConfig extends AbstractConfig {
-    private static final ConfigDef CONFIG;
-
-    /*
-     * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
-     * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
-     */
-
-    /**
-     * <code>group.id</code>
-     */
-    public static final String GROUP_ID_CONFIG = "group.id";
-    private static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";
-
-    /**
-     * <code>session.timeout.ms</code>
-     */
-    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
-    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
-
-    /**
-     * <code>bootstrap.servers</code>
-     */
-    public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
-
-    /**
-     * <code>enable.auto.commit</code>
-     */
-    public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
-    private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";
-
-    /**
-     * <code>auto.commit.interval.ms</code>
-     */
-    public static final String AUTO_COMMIT_INTERVAL_MS_CONFIG = "auto.commit.interval.ms";
-    private static final String AUTO_COMMIT_INTERVAL_MS_DOC = "The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if <code>enable.auto.commit</code> is set to <code>true</code>.";
-
-    /**
-     * <code>partition.assignment.strategy</code>
-     */
-    public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
-    private static final String PARTITION_ASSIGNMENT_STRATEGY_DOC = "The friendly name of the partition assignment strategy that the server will use to distribute partition ownership amongst consumer instances when group management is used";
-
-    /**
-     * <code>auto.offset.reset</code>
-     */
-    public static final String AUTO_OFFSET_RESET_CONFIG = "auto.offset.reset";
-    private static final String AUTO_OFFSET_RESET_DOC = "What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): <ul><li>smallest: automatically reset the offset to the smallest offset<li>largest: automatically reset the offset to the largest offset</li><li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li></ul>";
-
-    /**
-     * <code>fetch.min.bytes</code>
-     */
-    public static final String FETCH_MIN_BYTES_CONFIG = "fetch.min.bytes";
-    private static final String FETCH_MIN_BYTES_DOC = "The minimum amount of data the server should return for a fetch request. If insufficient data is available the request will wait for that much data to accumulate before answering the request. The default setting of 1 byte means that fetch requests are answered as soon as a single byte of data is available or the fetch request times out waiting for data to arrive. Setting this to something greater than 1 will cause the server to wait for larger amounts of data to accumulate which can improve server throughput a bit at the cost of some additional latency.";
-
-    /**
-     * <code>fetch.max.wait.ms</code>
-     */
-    public static final String FETCH_MAX_WAIT_MS_CONFIG = "fetch.max.wait.ms";
-    private static final String FETCH_MAX_WAIT_MS_DOC = "The maximum amount of time the server will block before answering the fetch request if there isn't sufficient data to immediately satisfy the requirement given by fetch.min.bytes.";
-
-    /** <code>metadata.max.age.ms</code> */
-    public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
-
-    /**
-     * <code>max.partition.fetch.bytes</code>
-     */
-    public static final String MAX_PARTITION_FETCH_BYTES_CONFIG = "max.partition.fetch.bytes";
-    private static final String MAX_PARTITION_FETCH_BYTES_DOC = "The maximum amount of data per-partition the server will return. The maximum total memory used for a request will be <code>#partitions * max.partition.fetch.bytes</code>. This size must be at least as large as the maximum message size the server allows or else it is possible for the producer to send messages larger than the consumer can fetch. If that happens, the consumer can get stuck trying to fetch a large message on a certain partition.";
-
-    /** <code>send.buffer.bytes</code> */
-    public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
-
-    /** <code>receive.buffer.bytes</code> */
-    public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
-
-    /**
-     * <code>client.id</code>
-     */
-    public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
-
-    /**
-     * <code>reconnect.backoff.ms</code>
-     */
-    public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
-
-    /**
-     * <code>retry.backoff.ms</code>
-     */
-    public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
-
-    /**
-     * <code>metrics.sample.window.ms</code>
-     */
-    public static final String METRICS_SAMPLE_WINDOW_MS_CONFIG = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG;
-
-    /**
-     * <code>metrics.num.samples</code>
-     */
-    public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
-
-    /**
-     * <code>metric.reporters</code>
-     */
-    public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
-
-    /**
-     * <code>rebalance.callback.class</code>
-     */
-    public static final String CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG = "rebalance.callback.class";
-    private static final String CONSUMER_REBALANCE_CALLBACK_CLASS_DOC = "A user-provided callback to execute when partition assignments change.";
-
-    /**
-     * <code>check.crcs</code>
-     */
-    public static final String CHECK_CRCS_CONFIG = "check.crcs";
-    private static final String CHECK_CRCS_DOC = "Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance.";
-    
-    /** <code>key.deserializer</code> */
-    public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer";
-    private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface.";
-
-    /** <code>value.deserializer</code> */
-    public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer";
-    private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface.";
-
-    /** <code>connections.max.idle.ms</code> */
-    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
-
-
-    static {
-        CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
-                                        Type.LIST,
-                                        Importance.HIGH,
-                                        CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
-                                .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
-                                .define(SESSION_TIMEOUT_MS_CONFIG,
-                                        Type.INT,
-                                        30000,
-                                        Importance.HIGH,
-                                        SESSION_TIMEOUT_MS_DOC)
-                                .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
-                                        Type.STRING,
-                                        "range",
-                                        in("range", "roundrobin"),
-                                        Importance.MEDIUM,
-                                        PARTITION_ASSIGNMENT_STRATEGY_DOC)
-                                .define(METADATA_MAX_AGE_CONFIG,
-                                        Type.LONG,
-                                        5 * 60 * 1000,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METADATA_MAX_AGE_DOC)
-                                .define(ENABLE_AUTO_COMMIT_CONFIG,
-                                        Type.BOOLEAN,
-                                        true,
-                                        Importance.MEDIUM,
-                                        ENABLE_AUTO_COMMIT_DOC)
-                                .define(AUTO_COMMIT_INTERVAL_MS_CONFIG,
-                                        Type.LONG,
-                                        5000,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        AUTO_COMMIT_INTERVAL_MS_DOC)
-                                .define(CLIENT_ID_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.LOW,
-                                        CommonClientConfigs.CLIENT_ID_DOC)
-                                .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
-                                        Type.INT,
-                                        1 * 1024 * 1024,
-                                        atLeast(0),
-                                        Importance.HIGH,
-                                        MAX_PARTITION_FETCH_BYTES_DOC)
-                                .define(SEND_BUFFER_CONFIG,
-                                        Type.INT,
-                                        128 * 1024,
-                                        atLeast(0),
-                                        Importance.MEDIUM,
-                                        CommonClientConfigs.SEND_BUFFER_DOC)
-                                .define(RECEIVE_BUFFER_CONFIG,
-                                        Type.INT,
-                                        32 * 1024,
-                                        atLeast(0),
-                                        Importance.MEDIUM,
-                                        CommonClientConfigs.RECEIVE_BUFFER_DOC)
-                                .define(FETCH_MIN_BYTES_CONFIG,
-                                        Type.INT,
-                                        1024,
-                                        atLeast(0),
-                                        Importance.HIGH,
-                                        FETCH_MIN_BYTES_DOC)
-                                .define(FETCH_MAX_WAIT_MS_CONFIG,
-                                        Type.INT,
-                                        500,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        FETCH_MAX_WAIT_MS_DOC)
-                                .define(RECONNECT_BACKOFF_MS_CONFIG,
-                                        Type.LONG,
-                                        50L,
-                                        atLeast(0L),
-                                        Importance.LOW,
-                                        CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
-                                .define(RETRY_BACKOFF_MS_CONFIG,
-                                        Type.LONG,
-                                        100L,
-                                        atLeast(0L),
-                                        Importance.LOW,
-                                        CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
-                                .define(AUTO_OFFSET_RESET_CONFIG,
-                                        Type.STRING,
-                                        "latest",
-                                        in("latest", "earliest", "none"),
-                                        Importance.MEDIUM,
-                                        AUTO_OFFSET_RESET_DOC)
-                                .define(CONSUMER_REBALANCE_CALLBACK_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        NoOpConsumerRebalanceCallback.class,
-                                        Importance.LOW,
-                                        CONSUMER_REBALANCE_CALLBACK_CLASS_DOC)
-                                .define(CHECK_CRCS_CONFIG,
-                                        Type.BOOLEAN,
-                                        true,
-                                        Importance.LOW,
-                                        CHECK_CRCS_DOC)                                
-                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
-                                        Type.LONG,
-                                        30000,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
-                                .define(METRICS_NUM_SAMPLES_CONFIG,
-                                        Type.INT,
-                                        2,
-                                        atLeast(1),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
-                                .define(METRIC_REPORTER_CLASSES_CONFIG,
-                                        Type.LIST,
-                                        "",
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
-                                .define(KEY_DESERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        KEY_DESERIALIZER_CLASS_DOC)
-                                .define(VALUE_DESERIALIZER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Importance.HIGH,
-                                        VALUE_DESERIALIZER_CLASS_DOC)
-                                /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
-                                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
-                                        Type.LONG,
-                                        9 * 60 * 1000,
-                                        Importance.MEDIUM,
-                                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC);
-    }
-
-    public static Map<String, Object> addDeserializerToConfig(Map<String, Object> configs,
-                                                              Deserializer<?> keyDeserializer,
-                                                              Deserializer<?> valueDeserializer) {
-        Map<String, Object> newConfigs = new HashMap<String, Object>();
-        newConfigs.putAll(configs);
-        if (keyDeserializer != null)
-            newConfigs.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass());
-        if (keyDeserializer != null)
-            newConfigs.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass());
-        return newConfigs;
-    }
-
-    public static Properties addDeserializerToConfig(Properties properties,
-                                                     Deserializer<?> keyDeserializer,
-                                                     Deserializer<?> valueDeserializer) {
-        Properties newProperties = new Properties();
-        newProperties.putAll(properties);
-        if (keyDeserializer != null)
-            newProperties.put(KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getClass().getName());
-        if (keyDeserializer != null)
-            newProperties.put(VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getClass().getName());
-        return newProperties;
-    }
-
-    public ConsumerConfig(Map<?, ?> props) {
-        super(CONFIG, props);
-    }
-
-    public static void main(String[] args) {
-        System.out.println(CONFIG.toHtmlTable());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/33f4c818/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerRebalanceCallback.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerRebalanceCallback.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerRebalanceCallback.java
deleted file mode 100644
index 7ab1cd3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/kafka/copied/clients/consumer/ConsumerRebalanceCallback.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.copied.clients.consumer;
-
-import org.apache.kafka.copied.common.TopicPartition;
-
-import java.util.Collection;
-
-/**
- * A callback interface that the user can implement to trigger custom actions when the set of partitions assigned to the
- * consumer changes.
- * <p>
- * This is applicable when the consumer is having Kafka auto-manage group membership, if the consumer's directly subscribe to partitions
- * those partitions will never be reassigned and this callback is not applicable.
- * <p>
- * When Kafka is managing the group membership, a partition re-assignment will be triggered any time the members of the group changes or the subscription
- * of the members changes. This can occur when processes die, new process instances are added or old instances come back to life after failure.
- * <p>
- * There are many uses for this functionality. One common use is saving offsets in a custom store. By saving offsets in
- * the {@link #onPartitionsRevoked(Consumer, Collection)} call we can ensure that any time partition assignment changes
- * the offset gets saved.
- * <p>
- * Another use is flushing out any kind of cache of intermediate results the consumer may be keeping. For example,
- * consider a case where the consumer is subscribed to a topic containing user page views, and the goal is to count the
- * number of page views per users for each five minute window. Let's say the topic is partitioned by the user id so that
- * all events for a particular user will go to a single consumer instance. The consumer can keep in memory a running
- * tally of actions per user and only flush these out to a remote data store when it's cache gets to big. However if a
- * partition is reassigned it may want to automatically trigger a flush of this cache, before the new owner takes over
- * consumption.
- * <p>
- * This callback will execute in the user thread as part of the {@link Consumer#poll(long) poll(long)} call whenever partition assignment changes.
- * <p>
- * It is guaranteed that all consumer processes will invoke {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} prior to 
- * any process invoking {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned}. So if offsets or other state is saved in the 
- * {@link #onPartitionsRevoked(Consumer, Collection) onPartitionsRevoked} call it is guaranteed to be saved by the time the process taking over that
- * partition has their {@link #onPartitionsAssigned(Consumer, Collection) onPartitionsAssigned} callback called to load the state.
- * <p>
- * Here is pseudo-code for a callback implementation for saving offsets:
- * <pre>
- * {@code
- *   public class SaveOffsetsOnRebalance implements ConsumerRebalanceCallback {
- *       public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- *           // read the offsets from an external store using some custom code not described here
- *           for(TopicPartition partition: partitions)
- *              consumer.position(partition, readOffsetFromExternalStore(partition));
- *       }      
- *       public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
- *           // save the offsets in an external store using some custom code not described here
- *           for(TopicPartition partition: partitions)
- *              saveOffsetInExternalStore(consumer.position(partition));
- *       }
- *   }
- * }
- * </pre>
- */
-public interface ConsumerRebalanceCallback {
-
-    /**
-     * A callback method the user can implement to provide handling of customized offsets on completion of a successful
-     * partition re-assignement. This method will be called after an offset re-assignement completes and before the
-     * consumer starts fetching data.
-     * <p>
-     * It is guaranteed that all the processes in a consumer group will execute their
-     * {@link #onPartitionsRevoked(Consumer, Collection)} callback before any instance executes its
-     * {@link #onPartitionsAssigned(Consumer, Collection)} callback.
-     *
-     * @param consumer Reference to the consumer for convenience
-     * @param partitions The list of partitions that are now assigned to the consumer (may include partitions previously
-     *            assigned to the consumer)
-     */
-    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
-
-    /**
-     * A callback method the user can implement to provide handling of offset commits to a customized store on the start
-     * of a rebalance operation. This method will be called before a rebalance operation starts and after the consumer
-     * stops fetching data. It is recommended that offsets should be committed in this callback to either Kafka or a
-     * custom offset store to prevent duplicate data
-     * <p>
-     * For examples on usage of this API, see Usage Examples section of {@link KafkaConsumer KafkaConsumer}
-     *
-     * @param consumer  Reference to the consumer for convenience
-     * @param partitions The list of partitions that were assigned to the consumer on the last rebalance
-     */
-    public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
-}


Mime
View raw message