flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [40/51] [abbrv] flink git commit: [FLINK-2386] [kafka connector] Remove copied Kafka code again. Implemented our own topic metadata retrieval.
Date Thu, 27 Aug 2015 11:25:57 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java
deleted file mode 100644
index f797ebe..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchRequest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class FetchRequest extends AbstractRequest {
-    
-    public static final int CONSUMER_REPLICA_ID = -1;
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.FETCH.id);
-    private static final String REPLICA_ID_KEY_NAME = "replica_id";
-    private static final String MAX_WAIT_KEY_NAME = "max_wait_time";
-    private static final String MIN_BYTES_KEY_NAME = "min_bytes";
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String FETCH_OFFSET_KEY_NAME = "fetch_offset";
-    private static final String MAX_BYTES_KEY_NAME = "max_bytes";
-
-    private final int replicaId;
-    private final int maxWait;
-    private final int minBytes;
-    private final Map<TopicPartition, PartitionData> fetchData;
-
-    public static final class PartitionData {
-        public final long offset;
-        public final int maxBytes;
-
-        public PartitionData(long offset, int maxBytes) {
-            this.offset = offset;
-            this.maxBytes = maxBytes;
-        }
-    }
-
-    /**
-     * Create a non-replica fetch request
-     */
-    public FetchRequest(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
-        this(CONSUMER_REPLICA_ID, maxWait, minBytes, fetchData);
-    }
-
-    /**
-     * Create a replica fetch request
-     */
-    public FetchRequest(int replicaId, int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(fetchData);
-
-        struct.set(REPLICA_ID_KEY_NAME, replicaId);
-        struct.set(MAX_WAIT_KEY_NAME, maxWait);
-        struct.set(MIN_BYTES_KEY_NAME, minBytes);
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry : topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(FETCH_OFFSET_KEY_NAME, fetchPartitionData.offset);
-                partitionData.set(MAX_BYTES_KEY_NAME, fetchPartitionData.maxBytes);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
-        this.replicaId = replicaId;
-        this.maxWait = maxWait;
-        this.minBytes = minBytes;
-        this.fetchData = fetchData;
-    }
-
-    public FetchRequest(Struct struct) {
-        super(struct);
-        replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
-        maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
-        minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
-        fetchData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                long offset = partitionResponse.getLong(FETCH_OFFSET_KEY_NAME);
-                int maxBytes = partitionResponse.getInt(MAX_BYTES_KEY_NAME);
-                PartitionData partitionData = new PartitionData(offset, maxBytes);
-                fetchData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, FetchResponse.PartitionData> responseData = new HashMap<TopicPartition, FetchResponse.PartitionData>();
-
-        for (Map.Entry<TopicPartition, PartitionData> entry: fetchData.entrySet()) {
-            FetchResponse.PartitionData partitionResponse = new FetchResponse.PartitionData(Errors.forException(e).code(),
-                    FetchResponse.INVALID_HIGHWATERMARK,
-                    FetchResponse.EMPTY_RECORD_SET);
-            responseData.put(entry.getKey(), partitionResponse);
-        }
-
-        switch (versionId) {
-            case 0:
-                return new FetchResponse(responseData);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.FETCH.id)));
-        }
-    }
-
-    public int replicaId() {
-        return replicaId;
-    }
-
-    public int maxWait() {
-        return maxWait;
-    }
-
-    public int minBytes() {
-        return minBytes;
-    }
-
-    public Map<TopicPartition, PartitionData> fetchData() {
-        return fetchData;
-    }
-
-    public static FetchRequest parse(ByteBuffer buffer, int versionId) {
-        return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, versionId, buffer));
-    }
-
-    public static FetchRequest parse(ByteBuffer buffer) {
-        return new FetchRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java
deleted file mode 100644
index 158833e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/FetchResponse.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class FetchResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.FETCH.id);
-    private static final String RESPONSES_KEY_NAME = "responses";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    /**
-     * Possible error code:
-     *
-     *  OFFSET_OUT_OF_RANGE (1)
-     *  UNKNOWN_TOPIC_OR_PARTITION (3)
-     *  NOT_LEADER_FOR_PARTITION (6)
-     *  REPLICA_NOT_AVAILABLE (9)
-     *  UNKNOWN (-1)
-     */
-
-    private static final String HIGH_WATERMARK_KEY_NAME = "high_watermark";
-    private static final String RECORD_SET_KEY_NAME = "record_set";
-
-    public static final long INVALID_HIGHWATERMARK = -1L;
-    public static final ByteBuffer EMPTY_RECORD_SET = ByteBuffer.allocate(0);
-
-    private final Map<TopicPartition, PartitionData> responseData;
-
-    public static final class PartitionData {
-        public final short errorCode;
-        public final long highWatermark;
-        public final ByteBuffer recordSet;
-
-        public PartitionData(short errorCode, long highWatermark, ByteBuffer recordSet) {
-            this.errorCode = errorCode;
-            this.highWatermark = highWatermark;
-            this.recordSet = recordSet;
-        }
-    }
-
-    public FetchResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, fetchPartitionData.errorCode);
-                partitionData.set(HIGH_WATERMARK_KEY_NAME, fetchPartitionData.highWatermark);
-                partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.recordSet);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-        this.responseData = responseData;
-    }
-
-    public FetchResponse(Struct struct) {
-        super(struct);
-        responseData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                long highWatermark = partitionResponse.getLong(HIGH_WATERMARK_KEY_NAME);
-                ByteBuffer recordSet = partitionResponse.getBytes(RECORD_SET_KEY_NAME);
-                PartitionData partitionData = new PartitionData(errorCode, highWatermark, recordSet);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-    }
-
-    public Map<TopicPartition, PartitionData> responseData() {
-        return responseData;
-    }
-
-    public static FetchResponse parse(ByteBuffer buffer) {
-        return new FetchResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java
deleted file mode 100644
index c8abb67..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatRequest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class HeartbeatRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.HEARTBEAT.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String GROUP_GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-
-    private final String groupId;
-    private final int groupGenerationId;
-    private final String consumerId;
-
-    public HeartbeatRequest(String groupId, int groupGenerationId, String consumerId) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(GROUP_GENERATION_ID_KEY_NAME, groupGenerationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        this.groupId = groupId;
-        this.groupGenerationId = groupGenerationId;
-        this.consumerId = consumerId;
-    }
-
-    public HeartbeatRequest(Struct struct) {
-        super(struct);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-        groupGenerationId = struct.getInt(GROUP_GENERATION_ID_KEY_NAME);
-        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        switch (versionId) {
-            case 0:
-                return new HeartbeatResponse(Errors.forException(e).code());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.HEARTBEAT.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public int groupGenerationId() {
-        return groupGenerationId;
-    }
-
-    public String consumerId() {
-        return consumerId;
-    }
-
-    public static HeartbeatRequest parse(ByteBuffer buffer, int versionId) {
-        return new HeartbeatRequest(ProtoUtils.parseRequest(ApiKeys.HEARTBEAT.id, versionId, buffer));
-    }
-
-    public static HeartbeatRequest parse(ByteBuffer buffer) {
-        return new HeartbeatRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java
deleted file mode 100644
index 4bf6669..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/HeartbeatResponse.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-
-import java.nio.ByteBuffer;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class HeartbeatResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.HEARTBEAT.id);
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    /**
-     * Possible error code:
-     *
-     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_CONSUMER (16)
-     * ILLEGAL_GENERATION (22)
-     * UNKNOWN_CONSUMER_ID (25)
-     */
-
-    private final short errorCode;
-    public HeartbeatResponse(short errorCode) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        this.errorCode = errorCode;
-    }
-
-    public HeartbeatResponse(Struct struct) {
-        super(struct);
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
-    }
-
-    public short errorCode() {
-        return errorCode;
-    }
-
-    public static HeartbeatResponse parse(ByteBuffer buffer) {
-        return new HeartbeatResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java
deleted file mode 100644
index f098d18..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupRequest.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class JoinGroupRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.JOIN_GROUP.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String SESSION_TIMEOUT_KEY_NAME = "session_timeout";
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static final String STRATEGY_KEY_NAME = "partition_assignment_strategy";
-
-    public static final String UNKNOWN_CONSUMER_ID = "";
-
-    private final String groupId;
-    private final int sessionTimeout;
-    private final List<String> topics;
-    private final String consumerId;
-    private final String strategy;
-
-    public JoinGroupRequest(String groupId, int sessionTimeout, List<String> topics, String consumerId, String strategy) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        struct.set(SESSION_TIMEOUT_KEY_NAME, sessionTimeout);
-        struct.set(TOPICS_KEY_NAME, topics.toArray());
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        struct.set(STRATEGY_KEY_NAME, strategy);
-        this.groupId = groupId;
-        this.sessionTimeout = sessionTimeout;
-        this.topics = topics;
-        this.consumerId = consumerId;
-        this.strategy = strategy;
-    }
-
-    public JoinGroupRequest(Struct struct) {
-        super(struct);
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-        sessionTimeout = struct.getInt(SESSION_TIMEOUT_KEY_NAME);
-        Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
-        topics = new ArrayList<String>();
-        for (Object topic: topicsArray)
-            topics.add((String) topic);
-        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
-        strategy = struct.getString(STRATEGY_KEY_NAME);
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        switch (versionId) {
-            case 0:
-                return new JoinGroupResponse(
-                        Errors.forException(e).code(),
-                        JoinGroupResponse.UNKNOWN_GENERATION_ID,
-                        JoinGroupResponse.UNKNOWN_CONSUMER_ID,
-                        Collections.<TopicPartition>emptyList());
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.JOIN_GROUP.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public int sessionTimeout() {
-        return sessionTimeout;
-    }
-
-    public List<String> topics() {
-        return topics;
-    }
-
-    public String consumerId() {
-        return consumerId;
-    }
-
-    public String strategy() {
-        return strategy;
-    }
-
-    public static JoinGroupRequest parse(ByteBuffer buffer, int versionId) {
-        return new JoinGroupRequest(ProtoUtils.parseRequest(ApiKeys.JOIN_GROUP.id, versionId, buffer));
-    }
-
-    public static JoinGroupRequest parse(ByteBuffer buffer) {
-        return new JoinGroupRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java
deleted file mode 100644
index 7d9b647..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/JoinGroupResponse.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class JoinGroupResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.JOIN_GROUP.id);
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    /**
-     * Possible error code:
-     *
-     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_CONSUMER (16)
-     * INCONSISTENT_PARTITION_ASSIGNMENT_STRATEGY (23)
-     * UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY (24)
-     * UNKNOWN_CONSUMER_ID (25)
-     * INVALID_SESSION_TIMEOUT (26)
-     */
-
-    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static final String ASSIGNED_PARTITIONS_KEY_NAME = "assigned_partitions";
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    public static final int UNKNOWN_GENERATION_ID = -1;
-    public static final String UNKNOWN_CONSUMER_ID = "";
-
-    private final short errorCode;
-    private final int generationId;
-    private final String consumerId;
-    private final List<TopicPartition> assignedPartitions;
-
-    public JoinGroupResponse(short errorCode, int generationId, String consumerId, List<TopicPartition> assignedPartitions) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        Map<String, List<Integer>> partitionsByTopic = CollectionUtils.groupDataByTopic(assignedPartitions);
-
-        struct.set(ERROR_CODE_KEY_NAME, errorCode);
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, List<Integer>> entries: partitionsByTopic.entrySet()) {
-            Struct topicData = struct.instance(ASSIGNED_PARTITIONS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            topicData.set(PARTITIONS_KEY_NAME, entries.getValue().toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(ASSIGNED_PARTITIONS_KEY_NAME, topicArray.toArray());
-
-        this.errorCode = errorCode;
-        this.generationId = generationId;
-        this.consumerId = consumerId;
-        this.assignedPartitions = assignedPartitions;
-    }
-
-    public JoinGroupResponse(Struct struct) {
-        super(struct);
-        assignedPartitions = new ArrayList<TopicPartition>();
-        for (Object topicDataObj : struct.getArray(ASSIGNED_PARTITIONS_KEY_NAME)) {
-            Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.getString(TOPIC_KEY_NAME);
-            for (Object partitionObj : topicData.getArray(PARTITIONS_KEY_NAME))
-                assignedPartitions.add(new TopicPartition(topic, (Integer) partitionObj));
-        }
-        errorCode = struct.getShort(ERROR_CODE_KEY_NAME);
-        generationId = struct.getInt(GENERATION_ID_KEY_NAME);
-        consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
-    }
-
-    public short errorCode() {
-        return errorCode;
-    }
-
-    public int generationId() {
-        return generationId;
-    }
-
-    public String consumerId() {
-        return consumerId;
-    }
-
-    public List<TopicPartition> assignedPartitions() {
-        return assignedPartitions;
-    }
-
-    public static JoinGroupResponse parse(ByteBuffer buffer) {
-        return new JoinGroupResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java
deleted file mode 100644
index 069e06d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetRequest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class ListOffsetRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id);
-    private static final String REPLICA_ID_KEY_NAME = "replica_id";
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String TIMESTAMP_KEY_NAME = "timestamp";
-    private static final String MAX_NUM_OFFSETS_KEY_NAME = "max_num_offsets";
-
-    private final int replicaId;
-    private final Map<TopicPartition, PartitionData> offsetData;
-
-    public static final class PartitionData {
-        public final long timestamp;
-        public final int maxNumOffsets;
-
-        public PartitionData(long timestamp, int maxNumOffsets) {
-            this.timestamp = timestamp;
-            this.maxNumOffsets = maxNumOffsets;
-        }
-    }
-    
-    public ListOffsetRequest(Map<TopicPartition, PartitionData> offsetData) {
-        this(-1, offsetData);
-    }
-
-    public ListOffsetRequest(int replicaId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
-
-        struct.set(REPLICA_ID_KEY_NAME, replicaId);
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData offsetPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(TIMESTAMP_KEY_NAME, offsetPartitionData.timestamp);
-                partitionData.set(MAX_NUM_OFFSETS_KEY_NAME, offsetPartitionData.maxNumOffsets);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
-        this.replicaId = replicaId;
-        this.offsetData = offsetData;
-    }
-
-    public ListOffsetRequest(Struct struct) {
-        super(struct);
-        replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
-        offsetData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
-                int maxNumOffsets = partitionResponse.getInt(MAX_NUM_OFFSETS_KEY_NAME);
-                PartitionData partitionData = new PartitionData(timestamp, maxNumOffsets);
-                offsetData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
-
-        for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
-            ListOffsetResponse.PartitionData partitionResponse = new ListOffsetResponse.PartitionData(Errors.forException(e).code(), new ArrayList<Long>());
-            responseData.put(entry.getKey(), partitionResponse);
-        }
-
-        switch (versionId) {
-            case 0:
-                return new ListOffsetResponse(responseData);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.LIST_OFFSETS.id)));
-        }
-    }
-
-    public int replicaId() {
-        return replicaId;
-    }
-
-    public Map<TopicPartition, PartitionData> offsetData() {
-        return offsetData;
-    }
-
-    public static ListOffsetRequest parse(ByteBuffer buffer, int versionId) {
-        return new ListOffsetRequest(ProtoUtils.parseRequest(ApiKeys.LIST_OFFSETS.id, versionId, buffer));
-    }
-
-    public static ListOffsetRequest parse(ByteBuffer buffer) {
-        return new ListOffsetRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java
deleted file mode 100644
index b831f61..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/ListOffsetResponse.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class ListOffsetResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.LIST_OFFSETS.id);
-    private static final String RESPONSES_KEY_NAME = "responses";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    /**
-     * Possible error code:
-     *
-     *  UNKNOWN_TOPIC_OR_PARTITION (3)
-     *  NOT_LEADER_FOR_PARTITION (6)
-     *  UNKNOWN (-1)
-     */
-
-    private static final String OFFSETS_KEY_NAME = "offsets";
-
-    private final Map<TopicPartition, PartitionData> responseData;
-
-    public static final class PartitionData {
-        public final short errorCode;
-        public final List<Long> offsets;
-
-        public PartitionData(short errorCode, List<Long> offsets) {
-            this.errorCode = errorCode;
-            this.offsets = offsets;
-        }
-    }
-
-    public ListOffsetResponse(Map<TopicPartition, PartitionData> responseData) {
-        super(new Struct(CURRENT_SCHEMA));
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData offsetPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, offsetPartitionData.errorCode);
-                partitionData.set(OFFSETS_KEY_NAME, offsetPartitionData.offsets.toArray());
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-        this.responseData = responseData;
-    }
-
-    public ListOffsetResponse(Struct struct) {
-        super(struct);
-        responseData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                Object[] offsets = partitionResponse.getArray(OFFSETS_KEY_NAME);
-                List<Long> offsetsList = new ArrayList<Long>();
-                for (Object offset: offsets)
-                    offsetsList.add((Long) offset);
-                PartitionData partitionData = new PartitionData(errorCode, offsetsList);
-                responseData.put(new TopicPartition(topic, partition), partitionData);
-            }
-        }
-    }
-
-    public Map<TopicPartition, PartitionData> responseData() {
-        return responseData;
-    }
-
-    public static ListOffsetResponse parse(ByteBuffer buffer) {
-        return new ListOffsetResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java
deleted file mode 100644
index 2820fcd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataRequest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.Node;
-import org.apache.flink.kafka_backport.common.PartitionInfo;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.Cluster;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class MetadataRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.METADATA.id);
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    private final List<String> topics;
-
-    public MetadataRequest(List<String> topics) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(TOPICS_KEY_NAME, topics.toArray());
-        this.topics = topics;
-    }
-
-    public MetadataRequest(Struct struct) {
-        super(struct);
-        Object[] topicArray = struct.getArray(TOPICS_KEY_NAME);
-        topics = new ArrayList<String>();
-        for (Object topicObj: topicArray) {
-            topics.add((String) topicObj);
-        }
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<String, Errors> topicErrors = new HashMap<String, Errors>();
-        for (String topic : topics) {
-            topicErrors.put(topic, Errors.forException(e));
-        }
-
-        Cluster cluster = new Cluster(new ArrayList<Node>(), new ArrayList<PartitionInfo>());
-        switch (versionId) {
-            case 0:
-                return new MetadataResponse(cluster, topicErrors);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.METADATA.id)));
-        }
-    }
-
-    public List<String> topics() {
-        return topics;
-    }
-
-    public static MetadataRequest parse(ByteBuffer buffer, int versionId) {
-        return new MetadataRequest(ProtoUtils.parseRequest(ApiKeys.METADATA.id, versionId, buffer));
-    }
-
-    public static MetadataRequest parse(ByteBuffer buffer) {
-        return new MetadataRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java
deleted file mode 100644
index 83d7290..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/MetadataResponse.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.Node;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.Cluster;
-import org.apache.flink.kafka_backport.common.PartitionInfo;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class MetadataResponse extends AbstractRequestResponse {
-
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.METADATA.id);
-    private static final String BROKERS_KEY_NAME = "brokers";
-    private static final String TOPIC_METATDATA_KEY_NAME = "topic_metadata";
-
-    // broker level field names
-    private static final String NODE_ID_KEY_NAME = "node_id";
-    private static final String HOST_KEY_NAME = "host";
-    private static final String PORT_KEY_NAME = "port";
-
-    // topic level field names
-    private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
-
-    /**
-     * Possible error code:
-     *
-     * TODO
-     */
-
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITION_METADATA_KEY_NAME = "partition_metadata";
-
-    // partition level field names
-    private static final String PARTITION_ERROR_CODE_KEY_NAME = "partition_error_code";
-
-    /**
-     * Possible error code:
-     *
-     * TODO
-     */
-
-    private static final String PARTITION_KEY_NAME = "partition_id";
-    private static final String LEADER_KEY_NAME = "leader";
-    private static final String REPLICAS_KEY_NAME = "replicas";
-    private static final String ISR_KEY_NAME = "isr";
-
-    private final Cluster cluster;
-    private final Map<String, Errors> errors;
-
-    /**
-     * Constructor for MetadataResponse where there are errors for some of the topics,
-     * error data take precedence over cluster information for particular topic
-     */
-    public MetadataResponse(Cluster cluster, Map<String, Errors> errors) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        List<Struct> brokerArray = new ArrayList<Struct>();
-        for (Node node : cluster.nodes()) {
-            Struct broker = struct.instance(BROKERS_KEY_NAME);
-            broker.set(NODE_ID_KEY_NAME, node.id());
-            broker.set(HOST_KEY_NAME, node.host());
-            broker.set(PORT_KEY_NAME, node.port());
-            brokerArray.add(broker);
-        }
-        struct.set(BROKERS_KEY_NAME, brokerArray.toArray());
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (String topic : cluster.topics()) {
-            Struct topicData = struct.instance(TOPIC_METATDATA_KEY_NAME);
-
-            topicData.set(TOPIC_KEY_NAME, topic);
-            if (errors.containsKey(topic)) {
-                topicData.set(TOPIC_ERROR_CODE_KEY_NAME, errors.get(topic).code());
-            } else {
-                topicData.set(TOPIC_ERROR_CODE_KEY_NAME, Errors.NONE.code());
-                List<Struct> partitionArray = new ArrayList<Struct>();
-                for (PartitionInfo fetchPartitionData : cluster.partitionsForTopic(topic)) {
-                    Struct partitionData = topicData.instance(PARTITION_METADATA_KEY_NAME);
-                    partitionData.set(PARTITION_ERROR_CODE_KEY_NAME, Errors.NONE.code());
-                    partitionData.set(PARTITION_KEY_NAME, fetchPartitionData.partition());
-                    partitionData.set(LEADER_KEY_NAME, fetchPartitionData.leader().id());
-                    ArrayList<Integer> replicas = new ArrayList<Integer>();
-                    for (Node node : fetchPartitionData.replicas())
-                        replicas.add(node.id());
-                    partitionData.set(REPLICAS_KEY_NAME, replicas.toArray());
-                    ArrayList<Integer> isr = new ArrayList<Integer>();
-                    for (Node node : fetchPartitionData.inSyncReplicas())
-                        isr.add(node.id());
-                    partitionData.set(ISR_KEY_NAME, isr.toArray());
-                    partitionArray.add(partitionData);
-                }
-                topicData.set(PARTITION_METADATA_KEY_NAME, partitionArray.toArray());
-            }
-
-            topicArray.add(topicData);
-        }
-        struct.set(TOPIC_METATDATA_KEY_NAME, topicArray.toArray());
-
-        this.cluster = cluster;
-        this.errors = new HashMap<String, Errors>();
-    }
-
-    public MetadataResponse(Struct struct) {
-        super(struct);
-        Map<String, Errors> errors = new HashMap<String, Errors>();
-        Map<Integer, Node> brokers = new HashMap<Integer, Node>();
-        Object[] brokerStructs = (Object[]) struct.get(BROKERS_KEY_NAME);
-        for (int i = 0; i < brokerStructs.length; i++) {
-            Struct broker = (Struct) brokerStructs[i];
-            int nodeId = broker.getInt(NODE_ID_KEY_NAME);
-            String host = broker.getString(HOST_KEY_NAME);
-            int port = broker.getInt(PORT_KEY_NAME);
-            brokers.put(nodeId, new Node(nodeId, host, port));
-        }
-        List<PartitionInfo> partitions = new ArrayList<PartitionInfo>();
-        Object[] topicInfos = (Object[]) struct.get(TOPIC_METATDATA_KEY_NAME);
-        for (int i = 0; i < topicInfos.length; i++) {
-            Struct topicInfo = (Struct) topicInfos[i];
-            short topicError = topicInfo.getShort(TOPIC_ERROR_CODE_KEY_NAME);
-            String topic = topicInfo.getString(TOPIC_KEY_NAME);
-            if (topicError == Errors.NONE.code()) {
-                Object[] partitionInfos = (Object[]) topicInfo.get(PARTITION_METADATA_KEY_NAME);
-                for (int j = 0; j < partitionInfos.length; j++) {
-                    Struct partitionInfo = (Struct) partitionInfos[j];
-                    int partition = partitionInfo.getInt(PARTITION_KEY_NAME);
-                    int leader = partitionInfo.getInt(LEADER_KEY_NAME);
-                    Node leaderNode = leader == -1 ? null : brokers.get(leader);
-                    Object[] replicas = (Object[]) partitionInfo.get(REPLICAS_KEY_NAME);
-                    Node[] replicaNodes = new Node[replicas.length];
-                    for (int k = 0; k < replicas.length; k++)
-                        replicaNodes[k] = brokers.get(replicas[k]);
-                    Object[] isr = (Object[]) partitionInfo.get(ISR_KEY_NAME);
-                    Node[] isrNodes = new Node[isr.length];
-                    for (int k = 0; k < isr.length; k++)
-                        isrNodes[k] = brokers.get(isr[k]);
-                    partitions.add(new PartitionInfo(topic, partition, leaderNode, replicaNodes, isrNodes));
-                }
-            } else {
-                errors.put(topic, Errors.forCode(topicError));
-            }
-        }
-        this.errors = errors;
-        this.cluster = new Cluster(brokers.values(), partitions);
-    }
-
-    public Map<String, Errors> errors() {
-        return this.errors;
-    }
-
-    public Cluster cluster() {
-        return this.cluster;
-    }
-
-    public static MetadataResponse parse(ByteBuffer buffer) {
-        return new MetadataResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java
deleted file mode 100644
index b33d2c1..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitRequest.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.Errors;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-/**
- * This wrapper supports both v0 and v1 of OffsetCommitRequest.
- */
-public class OffsetCommitRequest extends AbstractRequest {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.OFFSET_COMMIT.id);
-    private static final String GROUP_ID_KEY_NAME = "group_id";
-    private static final String GENERATION_ID_KEY_NAME = "group_generation_id";
-    private static final String CONSUMER_ID_KEY_NAME = "consumer_id";
-    private static final String TOPICS_KEY_NAME = "topics";
-    private static final String RETENTION_TIME_KEY_NAME = "retention_time";
-
-    // topic level field names
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    // partition level field names
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String COMMIT_OFFSET_KEY_NAME = "offset";
-    private static final String METADATA_KEY_NAME = "metadata";
-
-    @Deprecated
-    private static final String TIMESTAMP_KEY_NAME = "timestamp";         // for v0, v1
-
-    // default values for the current version
-    public static final int DEFAULT_GENERATION_ID = -1;
-    public static final String DEFAULT_CONSUMER_ID = "";
-    public static final long DEFAULT_RETENTION_TIME = -1L;
-
-    // default values for old versions,
-    // will be removed after these versions are deprecated
-    @Deprecated
-    public static final long DEFAULT_TIMESTAMP = -1L;            // for V0, V1
-
-    private final String groupId;
-    private final String consumerId;
-    private final int generationId;
-    private final long retentionTime;
-    private final Map<TopicPartition, PartitionData> offsetData;
-
-    public static final class PartitionData {
-        @Deprecated
-        public final long timestamp;                // for V1
-
-        public final long offset;
-        public final String metadata;
-
-        @Deprecated
-        public PartitionData(long offset, long timestamp, String metadata) {
-            this.offset = offset;
-            this.timestamp = timestamp;
-            this.metadata = metadata;
-        }
-
-        public PartitionData(long offset, String metadata) {
-            this(offset, DEFAULT_TIMESTAMP, metadata);
-        }
-    }
-
-    /**
-     * Constructor for version 0.
-     * @param groupId
-     * @param offsetData
-     */
-    @Deprecated
-    public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
-
-        initCommonFields(groupId, offsetData);
-        this.groupId = groupId;
-        this.generationId = DEFAULT_GENERATION_ID;
-        this.consumerId = DEFAULT_CONSUMER_ID;
-        this.retentionTime = DEFAULT_RETENTION_TIME;
-        this.offsetData = offsetData;
-    }
-
-    /**
-     * Constructor for version 1.
-     * @param groupId
-     * @param generationId
-     * @param consumerId
-     * @param offsetData
-     */
-    @Deprecated
-    public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 1)));
-
-        initCommonFields(groupId, offsetData);
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        this.groupId = groupId;
-        this.generationId = generationId;
-        this.consumerId = consumerId;
-        this.retentionTime = DEFAULT_RETENTION_TIME;
-        this.offsetData = offsetData;
-    }
-
-    /**
-     * Constructor for version 2.
-     * @param groupId
-     * @param generationId
-     * @param consumerId
-     * @param retentionTime
-     * @param offsetData
-     */
-    public OffsetCommitRequest(String groupId, int generationId, String consumerId, long retentionTime, Map<TopicPartition, PartitionData> offsetData) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        initCommonFields(groupId, offsetData);
-        struct.set(GENERATION_ID_KEY_NAME, generationId);
-        struct.set(CONSUMER_ID_KEY_NAME, consumerId);
-        struct.set(RETENTION_TIME_KEY_NAME, retentionTime);
-        this.groupId = groupId;
-        this.generationId = generationId;
-        this.consumerId = consumerId;
-        this.retentionTime = retentionTime;
-        this.offsetData = offsetData;
-    }
-
-    private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) {
-        Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
-
-        struct.set(GROUP_ID_KEY_NAME, groupId);
-        List<Struct> topicArray = new ArrayList<Struct>();
-
-        for (Map.Entry<String, Map<Integer, PartitionData>> topicEntry: topicsData.entrySet()) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : topicEntry.getValue().entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
-                // Only for v1
-                if (partitionData.hasField(TIMESTAMP_KEY_NAME))
-                    partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
-                partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
-    }
-
-    public OffsetCommitRequest(Struct struct) {
-        super(struct);
-
-        groupId = struct.getString(GROUP_ID_KEY_NAME);
-        // This field only exists in v1.
-        if (struct.hasField(GENERATION_ID_KEY_NAME))
-            generationId = struct.getInt(GENERATION_ID_KEY_NAME);
-        else
-            generationId = DEFAULT_GENERATION_ID;
-
-        // This field only exists in v1.
-        if (struct.hasField(CONSUMER_ID_KEY_NAME))
-            consumerId = struct.getString(CONSUMER_ID_KEY_NAME);
-        else
-            consumerId = DEFAULT_CONSUMER_ID;
-
-        // This field only exists in v2
-        if (struct.hasField(RETENTION_TIME_KEY_NAME))
-            retentionTime = struct.getLong(RETENTION_TIME_KEY_NAME);
-        else
-            retentionTime = DEFAULT_RETENTION_TIME;
-
-        offsetData = new HashMap<TopicPartition, PartitionData>();
-        for (Object topicDataObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicData = (Struct) topicDataObj;
-            String topic = topicData.getString(TOPIC_KEY_NAME);
-            for (Object partitionDataObj : topicData.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionDataStruct = (Struct) partitionDataObj;
-                int partition = partitionDataStruct.getInt(PARTITION_KEY_NAME);
-                long offset = partitionDataStruct.getLong(COMMIT_OFFSET_KEY_NAME);
-                String metadata = partitionDataStruct.getString(METADATA_KEY_NAME);
-                PartitionData partitionOffset;
-                // This field only exists in v1
-                if (partitionDataStruct.hasField(TIMESTAMP_KEY_NAME)) {
-                    long timestamp = partitionDataStruct.getLong(TIMESTAMP_KEY_NAME);
-                    partitionOffset = new PartitionData(offset, timestamp, metadata);
-                } else {
-                    partitionOffset = new PartitionData(offset, metadata);
-                }
-                offsetData.put(new TopicPartition(topic, partition), partitionOffset);
-            }
-        }
-    }
-
-    @Override
-    public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) {
-        Map<TopicPartition, Short> responseData = new HashMap<TopicPartition, Short>();
-        for (Map.Entry<TopicPartition, PartitionData> entry: offsetData.entrySet()) {
-            responseData.put(entry.getKey(), Errors.forException(e).code());
-        }
-
-        switch (versionId) {
-            // OffsetCommitResponseV0 == OffsetCommitResponseV1 == OffsetCommitResponseV2
-            case 0:
-            case 1:
-            case 2:
-                return new OffsetCommitResponse(responseData);
-            default:
-                throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.OFFSET_COMMIT.id)));
-        }
-    }
-
-    public String groupId() {
-        return groupId;
-    }
-
-    public int generationId() {
-        return generationId;
-    }
-
-    public String consumerId() {
-        return consumerId;
-    }
-
-    public long retentionTime() {
-        return retentionTime;
-    }
-
-    public Map<TopicPartition, PartitionData> offsetData() {
-        return offsetData;
-    }
-
-    public static OffsetCommitRequest parse(ByteBuffer buffer, int versionId) {
-        Schema schema = ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, versionId);
-        return new OffsetCommitRequest((Struct) schema.read(buffer));
-    }
-
-    public static OffsetCommitRequest parse(ByteBuffer buffer) {
-        return new OffsetCommitRequest((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/76fcaca8/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java
deleted file mode 100644
index 5f14b63..0000000
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka-083/src/main/java/org/apache/flink/kafka_backport/common/requests/OffsetCommitResponse.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.flink.kafka_backport.common.requests;
-
-import org.apache.flink.kafka_backport.common.protocol.types.Struct;
-import org.apache.flink.kafka_backport.common.TopicPartition;
-import org.apache.flink.kafka_backport.common.protocol.ApiKeys;
-import org.apache.flink.kafka_backport.common.protocol.ProtoUtils;
-import org.apache.flink.kafka_backport.common.protocol.types.Schema;
-import org.apache.flink.kafka_backport.common.utils.CollectionUtils;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-// ----------------------------------------------------------------------------
-//  This class is copied from the Apache Kafka project.
-// 
-//  The class is part of a "backport" of the new consumer API, in order to
-//  give Flink access to its functionality until the API is properly released.
-// 
-//  This is a temporary workaround!
-// ----------------------------------------------------------------------------
-
-public class OffsetCommitResponse extends AbstractRequestResponse {
-    
-    private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.OFFSET_COMMIT.id);
-    private static final String RESPONSES_KEY_NAME = "responses";
-
-    // topic level fields
-    private static final String TOPIC_KEY_NAME = "topic";
-    private static final String PARTITIONS_KEY_NAME = "partition_responses";
-
-    // partition level fields
-    private static final String PARTITION_KEY_NAME = "partition";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-
-    /**
-     * Possible error code:
-     *
-     * OFFSET_METADATA_TOO_LARGE (12)
-     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
-     * NOT_COORDINATOR_FOR_CONSUMER (16)
-     * ILLEGAL_GENERATION (22)
-     * UNKNOWN_CONSUMER_ID (25)
-     * COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
-     * INVALID_COMMIT_OFFSET_SIZE (28)
-     */
-
-    private final Map<TopicPartition, Short> responseData;
-
-    public OffsetCommitResponse(Map<TopicPartition, Short> responseData) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        Map<String, Map<Integer, Short>> topicsData = CollectionUtils.groupDataByTopic(responseData);
-
-        List<Struct> topicArray = new ArrayList<Struct>();
-        for (Map.Entry<String, Map<Integer, Short>> entries: topicsData.entrySet()) {
-            Struct topicData = struct.instance(RESPONSES_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, entries.getKey());
-            List<Struct> partitionArray = new ArrayList<Struct>();
-            for (Map.Entry<Integer, Short> partitionEntry : entries.getValue().entrySet()) {
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(ERROR_CODE_KEY_NAME, partitionEntry.getValue());
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(RESPONSES_KEY_NAME, topicArray.toArray());
-        this.responseData = responseData;
-    }
-
-    public OffsetCommitResponse(Struct struct) {
-        super(struct);
-        responseData = new HashMap<TopicPartition, Short>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.getString(TOPIC_KEY_NAME);
-            for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
-                short errorCode = partitionResponse.getShort(ERROR_CODE_KEY_NAME);
-                responseData.put(new TopicPartition(topic, partition), errorCode);
-            }
-        }
-    }
-
-    public Map<TopicPartition, Short> responseData() {
-        return responseData;
-    }
-
-    public static OffsetCommitResponse parse(ByteBuffer buffer) {
-        return new OffsetCommitResponse((Struct) CURRENT_SCHEMA.read(buffer));
-    }
-}


Mime
View raw message